From 6fed0faa5cc144bb89365de128445725096c0c77 Mon Sep 17 00:00:00 2001 From: Paul Apostol Date: Mon, 25 Jun 2018 09:13:17 +0000 Subject: [PATCH 1/4] Add unsubscribe method. --- lib/latest/MQRouter.js | 23 ++++ lib/latest/QueuesRoutingTable.js | 20 ++++ lib/v6x/MQRouter.js | 147 ++++++++++++++++---------- lib/v6x/QueuesRoutingTable.js | 20 ++++ package-lock.json | 24 ++--- package.json | 2 +- test/MQRouter/Unsubscribe.js | 77 ++++++++++++++ test/MQRouter/index.js | 1 + test/QueuesRoutingTable/Unregister.js | 19 +++- 9 files changed, 261 insertions(+), 72 deletions(-) create mode 100644 test/MQRouter/Unsubscribe.js diff --git a/lib/latest/MQRouter.js b/lib/latest/MQRouter.js index 5f78d48..d911598 100644 --- a/lib/latest/MQRouter.js +++ b/lib/latest/MQRouter.js @@ -244,6 +244,29 @@ class MQRouter { } } + /** + * Unsubscribe from a queue + * @param {Object} mqProps mq subscription properties + * @returns {Boolean} returns true if unsubscribed + * @public + */ + async unsubscribe(mqProps) { + let consumerTag = null; + let index = null; + try { + ({ consumerTag, index } = this.queuesRoutingTable.getHandlerRefsByProperties(mqProps)); + this.queuesRoutingTable.unregister({ index }); + return await this.connector.unsubscribe({ consumerTag }); + } catch (cause) { + debug(`error unsubscribing - ${cause.message}`); + throw Reflect.construct(IError, [{ + name: 'MQ_ROUTER_UNSUBSCRIBE', + source: `${this.sourceIdentifier}.unsubscribe`, + cause, + }]); + } + } + /** * Send request over mq * @param {Buffer} message message to be sent diff --git a/lib/latest/QueuesRoutingTable.js b/lib/latest/QueuesRoutingTable.js index ffc2b8d..9f48427 100644 --- a/lib/latest/QueuesRoutingTable.js +++ b/lib/latest/QueuesRoutingTable.js @@ -140,6 +140,26 @@ class QueuesRoutingTable { source: `${this.sourceIdentifier}.getHandlerByConsumerTag`, }]); } + + /** + * locate refs by properties + * @param {String} queue queue name + * @param {String} topic topic name + * @param {String} exchange exchange name + * @returns {Object} index and consumerTag + * @public + */ + getHandlerRefsByProperties({ queue, topic, exchange }) { + try { + const { consumerTag } = Object.values(this.handlers).find(handler => + handler.queue === queue && + handler.topic === topic && + handler.exchange === exchange); + return { consumerTag, index: this.consumerTags[consumerTag] }; + } catch (cause) { + return null; + } + } } module.exports = { diff --git a/lib/v6x/MQRouter.js b/lib/v6x/MQRouter.js index df7ec5d..71eb638 100644 --- a/lib/v6x/MQRouter.js +++ b/lib/v6x/MQRouter.js @@ -286,6 +286,37 @@ class MQRouter { })(); } + /** + * Unsubscribe from a queue + * @param {Object} mqProps mq subscription properties + * @returns {Boolean} returns true if unsubscribed + * @public + */ + unsubscribe(mqProps) { + var _this4 = this; + + return _asyncToGenerator(function* () { + let consumerTag = null; + let index = null; + try { + var _queuesRoutingTable$g = _this4.queuesRoutingTable.getHandlerRefsByProperties(mqProps); + + consumerTag = _queuesRoutingTable$g.consumerTag; + index = _queuesRoutingTable$g.index; + + _this4.queuesRoutingTable.unregister({ index }); + return yield _this4.connector.unsubscribe({ consumerTag }); + } catch (cause) { + debug(`error unsubscribing - ${cause.message}`); + throw Reflect.construct(IError, [{ + name: 'MQ_ROUTER_UNSUBSCRIBE', + source: `${_this4.sourceIdentifier}.unsubscribe`, + cause + }]); + } + })(); + } + /** * Send request over mq * @param {Buffer} message message to be sent @@ -305,12 +336,12 @@ class MQRouter { options = {}, isRequest = false }) { - var _this4 = this; + var _this5 = this; return _asyncToGenerator(function* () { - yield _this4.validateDestination(destination); + yield _this5.validateDestination(destination); - var _ref2 = yield _this4.buildRequest({ + var _ref2 = yield _this5.buildRequest({ message, version, replyTo @@ -320,17 +351,17 @@ class MQRouter { id = _ref2.id; const lOptions = { - ttl: options.ttl || _this4.defaultTTL + ttl: options.ttl || _this5.defaultTTL }; let response = true; if (isRequest) { - response = _this4.requestsRoutingTable.register({ + response = _this5.requestsRoutingTable.register({ options: lOptions, id }); } try { - yield _this4.connector.sendMessage(Object.assign({}, destination, { + yield _this5.connector.sendMessage(Object.assign({}, destination, { message: serializedMessage, options: lOptions })); @@ -338,7 +369,7 @@ class MQRouter { if (response === true) { throw error; } - _this4.requestsRoutingTable.callById({ + _this5.requestsRoutingTable.callById({ id, error }); @@ -389,11 +420,11 @@ class MQRouter { * @private */ waitForSelfSubscription() { - var _this5 = this; + var _this6 = this; return _asyncToGenerator(function* () { return new Promise(function (resolve, reject) { - _this5.mqrEvents.once('selfSubscribed', function ({ error }) { + _this6.mqrEvents.once('selfSubscribed', function ({ error }) { if (error) { return reject(error); } @@ -409,25 +440,25 @@ class MQRouter { * @private */ checkIfIsSelfSubscribedForResponses() { - var _this6 = this; + var _this7 = this; return _asyncToGenerator(function* () { - if (_this6.identification.subscribed) { + if (_this7.identification.subscribed) { return true; } - if (_this6.identification.subscribing) { - return _this6.waitForSelfSubscription(); + if (_this7.identification.subscribing) { + return _this7.waitForSelfSubscription(); } - _this6.identification.subscribing = true; + _this7.identification.subscribing = true; try { - var _ref3 = yield _this6.subscribe({ + var _ref3 = yield _this7.subscribe({ // this call will be covered in e2e tests handler: /* istanbul ignore next */function handler(...args) { - return _this6.ownHandler(...args); + return _this7.ownHandler(...args); }, - queue: _this6.identification.listen.queue, - topic: _this6.identification.listen.topic, - exchange: _this6.identification.listen.exchange, + queue: _this7.identification.listen.queue, + topic: _this7.identification.listen.topic, + exchange: _this7.identification.listen.exchange, options: { prefetch: 0, exclusive: true @@ -438,23 +469,23 @@ class MQRouter { queue = _ref3.queue; if (topic && topic !== '') { - _this6.returnDestination.queue = topic; + _this7.returnDestination.queue = topic; } else { - _this6.returnDestination.queue = queue; + _this7.returnDestination.queue = queue; } - _this6.identification.subscribed = true; - _this6.mqrEvents.emit('selfSubscribed', { error: null }); - _this6.identification.subscribing = false; + _this7.identification.subscribed = true; + _this7.mqrEvents.emit('selfSubscribed', { error: null }); + _this7.identification.subscribing = false; return true; } catch (cause) { - _this6.identification.subscribing = false; + _this7.identification.subscribing = false; debug(`Error self subscribing: ${cause.message}`); const error = Reflect.construct(IError, [{ name: 'MQ_ROUTER_SELF_SUBSCRIBE', - source: `${_this6.sourceIdentifier}.checkIfIsSelfSubscribedForResponses`, + source: `${_this7.sourceIdentifier}.checkIfIsSelfSubscribedForResponses`, cause }]); - _this6.mqrEvents.emit('selfSubscribed', { error }); + _this7.mqrEvents.emit('selfSubscribed', { error }); throw error; } })(); @@ -480,11 +511,11 @@ class MQRouter { exchange, version } = {}) { - var _this7 = this; + var _this8 = this; return _asyncToGenerator(function* () { try { - var _ref4 = yield _this7.queuesRoutingTable.getHandlerByConsumerTag({ consumerTag }); + var _ref4 = yield _this8.queuesRoutingTable.getHandlerByConsumerTag({ consumerTag }); const handler = _ref4.handler; @@ -499,7 +530,7 @@ class MQRouter { const responseMessage = _ref5.message; - return _this7.respondToRequest({ + return _this8.respondToRequest({ message: responseMessage, replyTo: message.id, destination: message.replyOn, @@ -509,10 +540,10 @@ class MQRouter { debug(`Error routing message - ${cause.message}`); const error = Reflect.construct(IError, [{ name: 'MQ_ROUTER_ROUTING_ERROR', - source: `${_this7.sourceIdentifier}.routeMessage`, + source: `${_this8.sourceIdentifier}.routeMessage`, cause }]); - _this7.mqrEvents.emit('error', { + _this8.mqrEvents.emit('error', { error, message, queue, @@ -543,16 +574,16 @@ class MQRouter { exchange, consumerTag }) { - var _this8 = this; + var _this9 = this; return _asyncToGenerator(function* () { try { - const unserializedMessage = yield _this8.mqMessage.from(message); - const version = _this8.mqKnownMessages.find(function (el) { + const unserializedMessage = yield _this9.mqMessage.from(message); + const version = _this9.mqKnownMessages.find(function (el) { return unserializedMessage instanceof el; }); if (version) { - return _this8.routeMessage({ + return _this9.routeMessage({ message: unserializedMessage, queue, topic, @@ -563,7 +594,7 @@ class MQRouter { } throw Reflect.construct(IError, [{ name: 'MQ_ROUTER_UNKNOWN_MESSAGE_TYPE', - source: `${_this8.sourceIdentifier}.consumeMessages`, + source: `${_this9.sourceIdentifier}.consumeMessages`, extra: { unserializedMessage } @@ -572,10 +603,10 @@ class MQRouter { debug(`Error consuming message - ${cause.message}`); const error = Reflect.construct(IError, [{ name: 'MQ_ROUTER_CONSUME_ERROR', - source: `${_this8.sourceIdentifier}.consumeMessages`, + source: `${_this9.sourceIdentifier}.consumeMessages`, cause }]); - _this8.mqrEvents.emit('error', { + _this9.mqrEvents.emit('error', { error, message, queue, @@ -603,13 +634,13 @@ class MQRouter { destination, version }) { - var _this9 = this; + var _this10 = this; return _asyncToGenerator(function* () { if (message === null) { return true; } - return _this9.sendMQMsg({ + return _this10.sendMQMsg({ isRequest: false, message, destination, @@ -635,11 +666,11 @@ class MQRouter { topic, exchange }) { - var _this10 = this; + var _this11 = this; return _asyncToGenerator(function* () { - if (_this10.defaultHandler) { - var _ref6 = yield _this10.defaultHandler.apply(_this10.defaultHandler, [{ + if (_this11.defaultHandler) { + var _ref6 = yield _this11.defaultHandler.apply(_this11.defaultHandler, [{ message: message.message, queue, topic, @@ -650,7 +681,7 @@ class MQRouter { // if consumer fails to send expected response it is a programming error // and it should crash program so it can be early corrected - yield _this10.respondToRequest({ + yield _this11.respondToRequest({ message: responseMessage, replyTo: message.id, destination: message.replyOn @@ -659,9 +690,9 @@ class MQRouter { } const error = Reflect.construct(IError, [{ name: 'MQ_ROUTER_OWN_HANDLER', - source: `${_this10.sourceIdentifier}.ownHandler` + source: `${_this11.sourceIdentifier}.ownHandler` }]); - _this10.mqrEvents.emit('error', { + _this11.mqrEvents.emit('error', { error, message, queue, @@ -687,22 +718,22 @@ class MQRouter { replyTo = '', to = '' }) { - var _this11 = this; + var _this12 = this; return _asyncToGenerator(function* () { if (!(message instanceof Buffer)) { debug('message is not a buffer'); throw Reflect.construct(IError, [{ name: 'MQ_ROUTER_BUILD_REQUEST_NO_BUFFER', - source: `${_this11.sourceIdentifier}.buildRequest` + source: `${_this12.sourceIdentifier}.buildRequest` }]); } - const id = _this11.getMessageId(); - const serializedMessage = (yield _this11.mqMessage.from({ + const id = _this12.getMessageId(); + const serializedMessage = (yield _this12.mqMessage.from({ id, replyTo, - replyOn: _this11.returnDestination, - from: _this11.identification.name, + replyOn: _this12.returnDestination, + from: _this12.identification.name, to, message }, version)).toPB(); @@ -730,7 +761,7 @@ class MQRouter { * @private */ validateDestination({ queue } = {}) { - var _this12 = this; + var _this13 = this; return _asyncToGenerator(function* () { if (queue && queue.length !== 0) { @@ -738,7 +769,7 @@ class MQRouter { } throw Reflect.construct(IError, [{ name: 'MQ_ROUTER_VALIDATE_DESTINATION', - source: `${_this12.sourceIdentifier}.validateDestination` + source: `${_this13.sourceIdentifier}.validateDestination` }]); })(); } @@ -749,11 +780,11 @@ class MQRouter { * @public */ close() { - var _this13 = this; + var _this14 = this; return _asyncToGenerator(function* () { - yield _this13.connector.close(); - return _this13.requestsRoutingTable.close(); + yield _this14.connector.close(); + return _this14.requestsRoutingTable.close(); })(); } } diff --git a/lib/v6x/QueuesRoutingTable.js b/lib/v6x/QueuesRoutingTable.js index 7561839..3030059 100644 --- a/lib/v6x/QueuesRoutingTable.js +++ b/lib/v6x/QueuesRoutingTable.js @@ -153,6 +153,26 @@ class QueuesRoutingTable { source: `${this.sourceIdentifier}.getHandlerByConsumerTag` }]); } + + /** + * locate refs by properties + * @param {String} queue queue name + * @param {String} topic topic name + * @param {String} exchange exchange name + * @returns {Object} index and consumerTag + * @public + */ + getHandlerRefsByProperties({ queue, topic, exchange }) { + try { + var _Object$values$find = Object.values(this.handlers).find(handler => handler.queue === queue && handler.topic === topic && handler.exchange === exchange); + + const consumerTag = _Object$values$find.consumerTag; + + return { consumerTag, index: this.consumerTags[consumerTag] }; + } catch (cause) { + return null; + } + } } module.exports = { diff --git a/package-lock.json b/package-lock.json index 4ddd50c..a1e74d7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -22,9 +22,9 @@ } }, "@itavy/mq-connector": { - "version": "0.0.2", - "resolved": "https://registry.npmjs.org/@itavy/mq-connector/-/mq-connector-0.0.2.tgz", - "integrity": "sha512-wfgFxCJLYOd0rQtEn6nJwrLw+TAx1nXXpwbCDIlvA1rrhCsskoSr9XW1kQgNUg1yw4UH2TXBxczqc8Uj7FDPuw==", + "version": "0.1.0", + "resolved": "https://nexus.netop.com/repository/portal-group/@itavy/mq-connector/-/mq-connector-0.1.0.tgz", + "integrity": "sha512-bFrOF3z0qd8fGunPce6VMIOVK//LDaxc/uKM/9TEvMGmfxlMdGGcoRujY12+yrt8cC+BV283eukPCH60ITPP8A==", "requires": { "@itavy/ierror": "1.1.2", "amqplib": "0.5.2", @@ -35,7 +35,7 @@ "dependencies": { "debug": { "version": "2.6.9", - "resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz", + "resolved": "https://nexus.netop.com/repository/portal-group/debug/-/debug-2.6.9.tgz", "integrity": "sha512-bC7ElrdJaJnPbAP+1EotYvqZsb3ecl5wi6Bfi6BJTUcNowp6cvspg0jXznRTKDjm/E7AdgFBVeAPVMNcKGsHMA==", "requires": { "ms": "2.0.0" @@ -210,7 +210,7 @@ }, "amqplib": { "version": "0.5.2", - "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.5.2.tgz", + "resolved": "https://nexus.netop.com/repository/portal-group/amqplib/-/amqplib-0.5.2.tgz", "integrity": "sha512-l9mCs6LbydtHqRniRwYkKdqxVa6XMz3Vw1fh+2gJaaVgTM6Jk3o8RccAKWKtlhT1US5sWrFh+KKxsVUALURSIA==", "requires": { "bitsyntax": "0.0.4", @@ -1144,7 +1144,7 @@ }, "bitsyntax": { "version": "0.0.4", - "resolved": "https://registry.npmjs.org/bitsyntax/-/bitsyntax-0.0.4.tgz", + "resolved": "https://nexus.netop.com/repository/portal-group/bitsyntax/-/bitsyntax-0.0.4.tgz", "integrity": "sha1-6xDMb4K4xJDj6FaY8H6D1G4MuoI=", "requires": { "buffer-more-ints": "0.0.2" @@ -1204,7 +1204,7 @@ }, "buffer-more-ints": { "version": "0.0.2", - "resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-0.0.2.tgz", + "resolved": "https://nexus.netop.com/repository/portal-group/buffer-more-ints/-/buffer-more-ints-0.0.2.tgz", "integrity": "sha1-JrOIXRD6E9t/wBquOquHAZngEkw=" }, "builtin-modules": { @@ -6289,7 +6289,7 @@ }, "p-event": { "version": "1.3.0", - "resolved": "https://registry.npmjs.org/p-event/-/p-event-1.3.0.tgz", + "resolved": "https://nexus.netop.com/repository/portal-group/p-event/-/p-event-1.3.0.tgz", "integrity": "sha1-jmtPT2XHK8W2/ii3XtqHT5akoIU=", "requires": { "p-timeout": "1.2.1" @@ -6297,7 +6297,7 @@ }, "p-finally": { "version": "1.0.0", - "resolved": "https://registry.npmjs.org/p-finally/-/p-finally-1.0.0.tgz", + "resolved": "https://nexus.netop.com/repository/portal-group/p-finally/-/p-finally-1.0.0.tgz", "integrity": "sha1-P7z7FbiZpEEjs0ttzBi3JDNqLK4=" }, "p-limit": { @@ -6320,7 +6320,7 @@ }, "p-timeout": { "version": "1.2.1", - "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-1.2.1.tgz", + "resolved": "https://nexus.netop.com/repository/portal-group/p-timeout/-/p-timeout-1.2.1.tgz", "integrity": "sha1-XrOzU7f86Z8QGhA4iAuwVOu+o4Y=", "requires": { "p-finally": "1.0.0" @@ -6676,7 +6676,7 @@ }, "readable-stream": { "version": "1.1.14", - "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.1.14.tgz", + "resolved": "https://nexus.netop.com/repository/portal-group/readable-stream/-/readable-stream-1.1.14.tgz", "integrity": "sha1-fPTFTvZI44EwhMY23SB54WbAgdk=", "requires": { "core-util-is": "1.0.2", @@ -7337,7 +7337,7 @@ }, "string_decoder": { "version": "0.10.31", - "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", + "resolved": "https://nexus.netop.com/repository/portal-group/string_decoder/-/string_decoder-0.10.31.tgz", "integrity": "sha1-YuIDvEF2bGwoyfyEMB2rHFMQ+pQ=" }, "stringstream": { diff --git a/package.json b/package.json index c843f05..63f8f63 100644 --- a/package.json +++ b/package.json @@ -50,7 +50,7 @@ }, "dependencies": { "@itavy/ierror": "^1.1.2", - "@itavy/mq-connector": "^0.0.2", + "@itavy/mq-connector": "^0.1.0", "@itavy/mq-structure": "^0.0.1", "debug": "~3.1.0", "semver": "~5.4.1", diff --git a/test/MQRouter/Unsubscribe.js b/test/MQRouter/Unsubscribe.js new file mode 100644 index 0000000..2cf9309 --- /dev/null +++ b/test/MQRouter/Unsubscribe.js @@ -0,0 +1,77 @@ +'use strict'; + +const { expect, getSinonSandbox } = require('@itavy/test-utilities'); +const { MQRouter } = require('../../'); +const { + queue, + mqURI, + name, + dummyQueue, + dummyTopic, + exchange, + randomId, +} = require('./Fixtures'); + +describe('Unsubscribe', () => { + let testRouter; + let dummyResolveHandler; + const sandbox = getSinonSandbox(); + + beforeEach((done) => { + testRouter = Reflect.construct(MQRouter, [{ + name, + queue, + mqURI, + }]); + dummyResolveHandler = sandbox.mock().resolves(); + done(); + }); + + afterEach(async () => { + await testRouter.close(); + sandbox.restore(); + }); + + it('Should successfully unsubscribe from channel', () => { + const cTagTest = randomId(30); + sandbox.stub(testRouter.connector, 'subscribe') + .resolves({ + queue: dummyQueue, + consumerTag: cTagTest, + }); + + sandbox.stub(testRouter.connector, 'unsubscribe') + .resolves(true); + + return testRouter.subscribe({ + handler: dummyResolveHandler, + queue: dummyQueue, + topic: dummyTopic, + exchange, + }) + .should.be.fulfilled + .then(() => testRouter.unsubscribe({ + queue: dummyQueue, + topic: dummyTopic, + exchange, + }) + .should.be.fulfilled + .then((success) => { + expect(success).to.be.eql(true); + })); + }); + + it('Should fail on unregistered queue', () => { + sandbox.stub(testRouter.queuesRoutingTable, 'getHandlerRefsByProperties').throws(new Error('')); + + return testRouter.unsubscribe({ + queue: dummyQueue, + topic: dummyTopic, + exchange, + }) + .should.be.rejected + .then((error) => { + expect(error.name).to.be.eql('MQ_ROUTER_UNSUBSCRIBE'); + }); + }); +}); diff --git a/test/MQRouter/index.js b/test/MQRouter/index.js index 3a57b5d..67fe1e1 100644 --- a/test/MQRouter/index.js +++ b/test/MQRouter/index.js @@ -19,6 +19,7 @@ describe('MQRouter', () => { require('./CheckIfIsSelfSubscribedForResponses'); require('./SendRequest'); require('./Subscribe'); + require('./Unsubscribe'); }); /* eslint-enable global-require */ diff --git a/test/QueuesRoutingTable/Unregister.js b/test/QueuesRoutingTable/Unregister.js index e82e38f..1376984 100644 --- a/test/QueuesRoutingTable/Unregister.js +++ b/test/QueuesRoutingTable/Unregister.js @@ -2,7 +2,14 @@ const { QueuesRoutingTable } = require('../../'); const { expect } = require('@itavy/test-utilities'); -const { addRecords, randomId, randomNumber } = require('./Fixtures'); +const { + addRecords, + randomId, + randomNumber, + dummyQueue, + dummyTopic, + exchange, +} = require('./Fixtures'); describe('Unregister', () => { @@ -63,4 +70,14 @@ describe('Unregister', () => { expect(result).to.be.equal(true); done(); }); + + it('Should return null if queue not registered', (done) => { + const result = testTable.getHandlerRefsByProperties({ + queue: dummyQueue, + topic: dummyTopic, + exchange, + }); + expect(result).to.be.equal(null); + done(); + }); }); From 778cb8bfbb9773a6025afa7e321627d0d3ba54bf Mon Sep 17 00:00:00 2001 From: Paul Apostol Date: Tue, 26 Jun 2018 09:32:03 +0000 Subject: [PATCH 2/4] Code style, returns and tests for unsubscription. --- lib/latest/MQRouter.js | 21 +++++-- lib/latest/QueuesRoutingTable.js | 25 +++++--- lib/v6x/MQRouter.js | 25 +++++--- lib/v6x/QueuesRoutingTable.js | 16 ++++- test/MQRouter/Unsubscribe.js | 29 ++++++--- .../GetHandlerRefsByProperties.js | 60 +++++++++++++++++++ test/QueuesRoutingTable/Unregister.js | 13 ---- test/QueuesRoutingTable/index.js | 1 + 8 files changed, 144 insertions(+), 46 deletions(-) create mode 100644 test/QueuesRoutingTable/GetHandlerRefsByProperties.js diff --git a/lib/latest/MQRouter.js b/lib/latest/MQRouter.js index d911598..8e23e1e 100644 --- a/lib/latest/MQRouter.js +++ b/lib/latest/MQRouter.js @@ -246,17 +246,26 @@ class MQRouter { /** * Unsubscribe from a queue - * @param {Object} mqProps mq subscription properties + * @param {String} queue queue name + * @param {String} topic topic name + * @param {String} exchange exchange name * @returns {Boolean} returns true if unsubscribed * @public */ - async unsubscribe(mqProps) { - let consumerTag = null; - let index = null; + async unsubscribe({ + queue, + topic, + exchange, + }) { try { - ({ consumerTag, index } = this.queuesRoutingTable.getHandlerRefsByProperties(mqProps)); + const { consumerTag, index } = this.queuesRoutingTable.getHandlerRefsByProperties({ + queue, + topic, + exchange, + }); + await this.connector.unsubscribe({ consumerTag }); this.queuesRoutingTable.unregister({ index }); - return await this.connector.unsubscribe({ consumerTag }); + return true; } catch (cause) { debug(`error unsubscribing - ${cause.message}`); throw Reflect.construct(IError, [{ diff --git a/lib/latest/QueuesRoutingTable.js b/lib/latest/QueuesRoutingTable.js index 9f48427..aa400ef 100644 --- a/lib/latest/QueuesRoutingTable.js +++ b/lib/latest/QueuesRoutingTable.js @@ -149,15 +149,26 @@ class QueuesRoutingTable { * @returns {Object} index and consumerTag * @public */ - getHandlerRefsByProperties({ queue, topic, exchange }) { + getHandlerRefsByProperties({ + queue, + topic, + exchange, + }) { try { - const { consumerTag } = Object.values(this.handlers).find(handler => - handler.queue === queue && - handler.topic === topic && - handler.exchange === exchange); - return { consumerTag, index: this.consumerTags[consumerTag] }; + const { consumerTag } = Object.values(this.handlers) + .find(handler => + handler.queue === queue && + handler.topic === topic && + handler.exchange === exchange); + return { + consumerTag, + index: this.consumerTags[consumerTag], + }; } catch (cause) { - return null; + return { + consumerTag: null, + index: null, + }; } } } diff --git a/lib/v6x/MQRouter.js b/lib/v6x/MQRouter.js index 71eb638..3244d9d 100644 --- a/lib/v6x/MQRouter.js +++ b/lib/v6x/MQRouter.js @@ -288,24 +288,33 @@ class MQRouter { /** * Unsubscribe from a queue - * @param {Object} mqProps mq subscription properties + * @param {String} queue queue name + * @param {String} topic topic name + * @param {String} exchange exchange name * @returns {Boolean} returns true if unsubscribed * @public */ - unsubscribe(mqProps) { + unsubscribe({ + queue, + topic, + exchange + }) { var _this4 = this; return _asyncToGenerator(function* () { - let consumerTag = null; - let index = null; try { - var _queuesRoutingTable$g = _this4.queuesRoutingTable.getHandlerRefsByProperties(mqProps); + var _queuesRoutingTable$g = _this4.queuesRoutingTable.getHandlerRefsByProperties({ + queue, + topic, + exchange + }); - consumerTag = _queuesRoutingTable$g.consumerTag; - index = _queuesRoutingTable$g.index; + const consumerTag = _queuesRoutingTable$g.consumerTag, + index = _queuesRoutingTable$g.index; + yield _this4.connector.unsubscribe({ consumerTag }); _this4.queuesRoutingTable.unregister({ index }); - return yield _this4.connector.unsubscribe({ consumerTag }); + return true; } catch (cause) { debug(`error unsubscribing - ${cause.message}`); throw Reflect.construct(IError, [{ diff --git a/lib/v6x/QueuesRoutingTable.js b/lib/v6x/QueuesRoutingTable.js index 3030059..308db06 100644 --- a/lib/v6x/QueuesRoutingTable.js +++ b/lib/v6x/QueuesRoutingTable.js @@ -162,15 +162,25 @@ class QueuesRoutingTable { * @returns {Object} index and consumerTag * @public */ - getHandlerRefsByProperties({ queue, topic, exchange }) { + getHandlerRefsByProperties({ + queue, + topic, + exchange + }) { try { var _Object$values$find = Object.values(this.handlers).find(handler => handler.queue === queue && handler.topic === topic && handler.exchange === exchange); const consumerTag = _Object$values$find.consumerTag; - return { consumerTag, index: this.consumerTags[consumerTag] }; + return { + consumerTag, + index: this.consumerTags[consumerTag] + }; } catch (cause) { - return null; + return { + consumerTag: null, + index: null + }; } } } diff --git a/test/MQRouter/Unsubscribe.js b/test/MQRouter/Unsubscribe.js index 2cf9309..e620414 100644 --- a/test/MQRouter/Unsubscribe.js +++ b/test/MQRouter/Unsubscribe.js @@ -32,7 +32,7 @@ describe('Unsubscribe', () => { sandbox.restore(); }); - it('Should successfully unsubscribe from channel', () => { + it('Should successfully unsubscribe from channel', (done) => { const cTagTest = randomId(30); sandbox.stub(testRouter.connector, 'subscribe') .resolves({ @@ -40,31 +40,40 @@ describe('Unsubscribe', () => { consumerTag: cTagTest, }); - sandbox.stub(testRouter.connector, 'unsubscribe') + const qrHandlerRefsSpy = sandbox.stub(testRouter.queuesRoutingTable, 'getHandlerRefsByProperties') + .returns(cTagTest); + + const connectorUnsubscribeSpy = sandbox.stub(testRouter.connector, 'unsubscribe') .resolves(true); - return testRouter.subscribe({ + sandbox.stub(testRouter.queuesRoutingTable, 'unregister') + .returns(true); + + testRouter.subscribe({ handler: dummyResolveHandler, queue: dummyQueue, topic: dummyTopic, exchange, }) - .should.be.fulfilled .then(() => testRouter.unsubscribe({ queue: dummyQueue, topic: dummyTopic, exchange, }) .should.be.fulfilled - .then((success) => { - expect(success).to.be.eql(true); + .then((result) => { + expect(qrHandlerRefsSpy.callCount).to.be.eql(1); + expect(connectorUnsubscribeSpy.callCount).to.be.eql(1); + expect(result).to.be.eql(true); + done(); })); }); - it('Should fail on unregistered queue', () => { - sandbox.stub(testRouter.queuesRoutingTable, 'getHandlerRefsByProperties').throws(new Error('')); + it('Should fail on unregistered queue', (done) => { + sandbox.stub(testRouter.queuesRoutingTable, 'getHandlerRefsByProperties') + .throws(new Error('test')); - return testRouter.unsubscribe({ + testRouter.unsubscribe({ queue: dummyQueue, topic: dummyTopic, exchange, @@ -72,6 +81,8 @@ describe('Unsubscribe', () => { .should.be.rejected .then((error) => { expect(error.name).to.be.eql('MQ_ROUTER_UNSUBSCRIBE'); + expect(error.cause.message).to.be.eql('test'); + done(); }); }); }); diff --git a/test/QueuesRoutingTable/GetHandlerRefsByProperties.js b/test/QueuesRoutingTable/GetHandlerRefsByProperties.js new file mode 100644 index 0000000..ea05f5f --- /dev/null +++ b/test/QueuesRoutingTable/GetHandlerRefsByProperties.js @@ -0,0 +1,60 @@ +'use strict'; + +const { QueuesRoutingTable } = require('../../'); +const { expect } = require('@itavy/test-utilities'); +const { + randomId, + randomNumber, + dummyQueue, + dummyTopic, + exchange, +} = require('./Fixtures'); + + +describe('GetHandlerRefsByProperties', () => { + let testTable = null; + + beforeEach((done) => { + testTable = Reflect.construct(QueuesRoutingTable, [{ + name: 'testTABLE', + }]); + done(); + }); + + afterEach((done) => { + done(); + }); + + + it('Should return right properties for registered queue', (done) => { + const pos = randomNumber(15); + const rec = { + queue: `testQueue${pos}`, + topic: `testTopic${pos}`, + exchange: `testExchange${pos}`, + handler: {}, + }; + const cTagTest = randomId(30); + const { index } = testTable.register(rec); + testTable.update({ + index, + consumerTag: cTagTest, + queue: rec.queue, + }); + const result = testTable.getHandlerRefsByProperties(rec); + expect(result.index).to.be.equal(index); + expect(result.consumerTag).to.be.equal(cTagTest); + done(); + }); + + it('Should return an object with null consumerTag and index if queue not registered', (done) => { + const result = testTable.getHandlerRefsByProperties({ + queue: dummyQueue, + topic: dummyTopic, + exchange, + }); + expect(result.consumerTag).to.be.equal(null); + expect(result.index).to.be.equal(null); + done(); + }); +}); diff --git a/test/QueuesRoutingTable/Unregister.js b/test/QueuesRoutingTable/Unregister.js index 1376984..2a3abba 100644 --- a/test/QueuesRoutingTable/Unregister.js +++ b/test/QueuesRoutingTable/Unregister.js @@ -6,9 +6,6 @@ const { addRecords, randomId, randomNumber, - dummyQueue, - dummyTopic, - exchange, } = require('./Fixtures'); @@ -70,14 +67,4 @@ describe('Unregister', () => { expect(result).to.be.equal(true); done(); }); - - it('Should return null if queue not registered', (done) => { - const result = testTable.getHandlerRefsByProperties({ - queue: dummyQueue, - topic: dummyTopic, - exchange, - }); - expect(result).to.be.equal(null); - done(); - }); }); diff --git a/test/QueuesRoutingTable/index.js b/test/QueuesRoutingTable/index.js index 114f53a..8811f18 100644 --- a/test/QueuesRoutingTable/index.js +++ b/test/QueuesRoutingTable/index.js @@ -7,4 +7,5 @@ describe('QueuesRoutingTable', () => { require('./Unregister'); // eslint-disable-line global-require require('./GetHandlerByIndex'); // eslint-disable-line global-require require('./GetHandlerByConsumerTag'); // eslint-disable-line global-require + require('./GetHandlerRefsByProperties'); // eslint-disable-line global-require }); From 7dad11ca605f4ce6cc22acb46ff8e51cd96815c1 Mon Sep 17 00:00:00 2001 From: Paul Apostol Date: Tue, 26 Jun 2018 13:23:36 +0000 Subject: [PATCH 3/4] Refactoring unsubscribe and its tests. --- lib/latest/MQRouter.js | 19 ++++++++++-------- lib/v6x/MQRouter.js | 25 +++++++++++++----------- test/MQRouter/Unsubscribe.js | 37 ++++++++++++++++++++++++++++++++++-- 3 files changed, 60 insertions(+), 21 deletions(-) diff --git a/lib/latest/MQRouter.js b/lib/latest/MQRouter.js index 8e23e1e..56f6b00 100644 --- a/lib/latest/MQRouter.js +++ b/lib/latest/MQRouter.js @@ -257,15 +257,18 @@ class MQRouter { topic, exchange, }) { + const { consumerTag, index } = this.queuesRoutingTable.getHandlerRefsByProperties({ + queue, + topic, + exchange, + }); try { - const { consumerTag, index } = this.queuesRoutingTable.getHandlerRefsByProperties({ - queue, - topic, - exchange, - }); - await this.connector.unsubscribe({ consumerTag }); - this.queuesRoutingTable.unregister({ index }); - return true; + if (consumerTag && index) { + await this.connector.unsubscribe({ consumerTag }); + this.queuesRoutingTable.unregister({ index }); + return true; + } + return false; } catch (cause) { debug(`error unsubscribing - ${cause.message}`); throw Reflect.construct(IError, [{ diff --git a/lib/v6x/MQRouter.js b/lib/v6x/MQRouter.js index 3244d9d..a5f974f 100644 --- a/lib/v6x/MQRouter.js +++ b/lib/v6x/MQRouter.js @@ -302,19 +302,22 @@ class MQRouter { var _this4 = this; return _asyncToGenerator(function* () { - try { - var _queuesRoutingTable$g = _this4.queuesRoutingTable.getHandlerRefsByProperties({ - queue, - topic, - exchange - }); + var _queuesRoutingTable$g = _this4.queuesRoutingTable.getHandlerRefsByProperties({ + queue, + topic, + exchange + }); - const consumerTag = _queuesRoutingTable$g.consumerTag, - index = _queuesRoutingTable$g.index; + const consumerTag = _queuesRoutingTable$g.consumerTag, + index = _queuesRoutingTable$g.index; - yield _this4.connector.unsubscribe({ consumerTag }); - _this4.queuesRoutingTable.unregister({ index }); - return true; + try { + if (consumerTag && index) { + yield _this4.connector.unsubscribe({ consumerTag }); + _this4.queuesRoutingTable.unregister({ index }); + return true; + } + return false; } catch (cause) { debug(`error unsubscribing - ${cause.message}`); throw Reflect.construct(IError, [{ diff --git a/test/MQRouter/Unsubscribe.js b/test/MQRouter/Unsubscribe.js index e620414..da2314b 100644 --- a/test/MQRouter/Unsubscribe.js +++ b/test/MQRouter/Unsubscribe.js @@ -32,8 +32,9 @@ describe('Unsubscribe', () => { sandbox.restore(); }); - it('Should successfully unsubscribe from channel', (done) => { + it('Should successfully unsubscribe from channel and return true', (done) => { const cTagTest = randomId(30); + const cIndexTest = randomId(30); sandbox.stub(testRouter.connector, 'subscribe') .resolves({ queue: dummyQueue, @@ -41,7 +42,10 @@ describe('Unsubscribe', () => { }); const qrHandlerRefsSpy = sandbox.stub(testRouter.queuesRoutingTable, 'getHandlerRefsByProperties') - .returns(cTagTest); + .returns({ + index: cIndexTest, + consumerTag: cTagTest, + }); const connectorUnsubscribeSpy = sandbox.stub(testRouter.connector, 'unsubscribe') .resolves(true); @@ -69,8 +73,36 @@ describe('Unsubscribe', () => { })); }); + it('Should not unsubscribe on unregistered queue and return false', (done) => { + sandbox.stub(testRouter.queuesRoutingTable, 'getHandlerRefsByProperties') + .returns({ + index: null, + consumerTag: null, + }); + + testRouter.unsubscribe({ + queue: dummyQueue, + topic: dummyTopic, + exchange, + }) + .should.be.fulfilled + .then((result) => { + expect(result).to.be.eql(false); + done(); + }); + }); + it('Should fail on unregistered queue', (done) => { + const cTagTest = randomId(30); + const cIndexTest = randomId(30); + sandbox.stub(testRouter.queuesRoutingTable, 'getHandlerRefsByProperties') + .returns({ + index: cIndexTest, + consumerTag: cTagTest, + }); + + const connectorUnsubscribeSpy = sandbox.stub(testRouter.connector, 'unsubscribe') .throws(new Error('test')); testRouter.unsubscribe({ @@ -80,6 +112,7 @@ describe('Unsubscribe', () => { }) .should.be.rejected .then((error) => { + expect(connectorUnsubscribeSpy.callCount).to.be.eql(1); expect(error.name).to.be.eql('MQ_ROUTER_UNSUBSCRIBE'); expect(error.cause.message).to.be.eql('test'); done(); From a5fd8e771e6692ae4cd2a9b3cdf5381e6aa25715 Mon Sep 17 00:00:00 2001 From: Paul Apostol Date: Wed, 27 Jun 2018 15:11:45 +0000 Subject: [PATCH 4/4] Catch errors on getHandlerRefsByProperties. No errors when serching for handler refs. --- lib/latest/MQRouter.js | 11 ++++++----- lib/latest/QueuesRoutingTable.js | 27 ++++++++++++++------------- lib/v6x/MQRouter.js | 17 +++++++++-------- lib/v6x/QueuesRoutingTable.js | 20 +++++++++----------- test/MQRouter/Unsubscribe.js | 2 +- 5 files changed, 39 insertions(+), 38 deletions(-) diff --git a/lib/latest/MQRouter.js b/lib/latest/MQRouter.js index 56f6b00..e0310dc 100644 --- a/lib/latest/MQRouter.js +++ b/lib/latest/MQRouter.js @@ -257,12 +257,13 @@ class MQRouter { topic, exchange, }) { - const { consumerTag, index } = this.queuesRoutingTable.getHandlerRefsByProperties({ - queue, - topic, - exchange, - }); try { + const { consumerTag, index } = this.queuesRoutingTable.getHandlerRefsByProperties({ + queue, + topic, + exchange, + }); + if (consumerTag && index) { await this.connector.unsubscribe({ consumerTag }); this.queuesRoutingTable.unregister({ index }); diff --git a/lib/latest/QueuesRoutingTable.js b/lib/latest/QueuesRoutingTable.js index aa400ef..6cb8bcd 100644 --- a/lib/latest/QueuesRoutingTable.js +++ b/lib/latest/QueuesRoutingTable.js @@ -154,22 +154,23 @@ class QueuesRoutingTable { topic, exchange, }) { - try { - const { consumerTag } = Object.values(this.handlers) - .find(handler => - handler.queue === queue && - handler.topic === topic && - handler.exchange === exchange); - return { - consumerTag, - index: this.consumerTags[consumerTag], - }; - } catch (cause) { + const handler = Object.values(this.handlers) + .find(el => + el.queue === queue && + el.topic === topic && + el.exchange === exchange); + + if (handler && handler.consumerTag) { return { - consumerTag: null, - index: null, + consumerTag: handler.consumerTag, + index: this.consumerTags[handler.consumerTag], }; } + + return { + consumerTag: null, + index: null, + }; } } diff --git a/lib/v6x/MQRouter.js b/lib/v6x/MQRouter.js index a5f974f..936d7b0 100644 --- a/lib/v6x/MQRouter.js +++ b/lib/v6x/MQRouter.js @@ -302,16 +302,17 @@ class MQRouter { var _this4 = this; return _asyncToGenerator(function* () { - var _queuesRoutingTable$g = _this4.queuesRoutingTable.getHandlerRefsByProperties({ - queue, - topic, - exchange - }); + try { + var _queuesRoutingTable$g = _this4.queuesRoutingTable.getHandlerRefsByProperties({ + queue, + topic, + exchange + }); + + const consumerTag = _queuesRoutingTable$g.consumerTag, + index = _queuesRoutingTable$g.index; - const consumerTag = _queuesRoutingTable$g.consumerTag, - index = _queuesRoutingTable$g.index; - try { if (consumerTag && index) { yield _this4.connector.unsubscribe({ consumerTag }); _this4.queuesRoutingTable.unregister({ index }); diff --git a/lib/v6x/QueuesRoutingTable.js b/lib/v6x/QueuesRoutingTable.js index 308db06..59705bd 100644 --- a/lib/v6x/QueuesRoutingTable.js +++ b/lib/v6x/QueuesRoutingTable.js @@ -167,21 +167,19 @@ class QueuesRoutingTable { topic, exchange }) { - try { - var _Object$values$find = Object.values(this.handlers).find(handler => handler.queue === queue && handler.topic === topic && handler.exchange === exchange); + const handler = Object.values(this.handlers).find(el => el.queue === queue && el.topic === topic && el.exchange === exchange); - const consumerTag = _Object$values$find.consumerTag; - - return { - consumerTag, - index: this.consumerTags[consumerTag] - }; - } catch (cause) { + if (handler && handler.consumerTag) { return { - consumerTag: null, - index: null + consumerTag: handler.consumerTag, + index: this.consumerTags[handler.consumerTag] }; } + + return { + consumerTag: null, + index: null + }; } } diff --git a/test/MQRouter/Unsubscribe.js b/test/MQRouter/Unsubscribe.js index da2314b..ec17736 100644 --- a/test/MQRouter/Unsubscribe.js +++ b/test/MQRouter/Unsubscribe.js @@ -92,7 +92,7 @@ describe('Unsubscribe', () => { }); }); - it('Should fail on unregistered queue', (done) => { + it('Should fail on not subscribed queue', (done) => { const cTagTest = randomId(30); const cIndexTest = randomId(30);