Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions lib/latest/MQRouter.js
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,41 @@ class MQRouter {
}
}

/**
* Unsubscribe from a queue
* @param {String} queue queue name
* @param {String} topic topic name
* @param {String} exchange exchange name
* @returns {Boolean} returns true if unsubscribed
* @public
*/
async unsubscribe({
queue,
topic,
exchange,
}) {
const { consumerTag, index } = this.queuesRoutingTable.getHandlerRefsByProperties({
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this fails it will not be caught by catch and will throw a non standard error

queue,
topic,
exchange,
});
try {
if (consumerTag && index) {
await this.connector.unsubscribe({ consumerTag });
this.queuesRoutingTable.unregister({ index });
return true;
}
return false;
} catch (cause) {
debug(`error unsubscribing - ${cause.message}`);
throw Reflect.construct(IError, [{
name: 'MQ_ROUTER_UNSUBSCRIBE',
source: `${this.sourceIdentifier}.unsubscribe`,
cause,
}]);
}
}

/**
* Send request over mq
* @param {Buffer} message message to be sent
Expand Down
31 changes: 31 additions & 0 deletions lib/latest/QueuesRoutingTable.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,37 @@ class QueuesRoutingTable {
source: `${this.sourceIdentifier}.getHandlerByConsumerTag`,
}]);
}

/**
* locate refs by properties
* @param {String} queue queue name
* @param {String} topic topic name
* @param {String} exchange exchange name
* @returns {Object} index and consumerTag
* @public
*/
getHandlerRefsByProperties({
queue,
topic,
exchange,
}) {
try {
const { consumerTag } = Object.values(this.handlers)
.find(handler =>
handler.queue === queue &&
handler.topic === topic &&
handler.exchange === exchange);
return {
consumerTag,
index: this.consumerTags[consumerTag],
};
} catch (cause) {
return {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please return here a specific error with cause as cause

in try block return null if consumerTag is undefined (not found)

element not found is a perfect response from this function. for actual case it is an error, but this is valid for the caller of getHandlerRefsByProperties. there can also be a check where you want to test if there is a handler associated for info provided and if it is not you need to do something else. by throwing error here you need to wrap it and do something on catch block.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can throw one error when handler not found and one when index not found by consumerTag:

getHandlerRefsByProperties({
    queue,
    topic,
    exchange,
  }) {
    const handler = Object.values(this.handlers)
      .find(el =>
        el.queue === queue &&
        el.topic === topic &&
        el.exchange === exchange);

    if (handler) {
      if (handler.consumerTag) {
        const index = this.consumerTags[handler.consumerTag];
        if (index) {
          return {
            consumerTag: handler.consumerTag,
            index,
          };
        }
        throw Reflect.construct(IError, [{
          name:   'MQ_ROUTING_TABLE_UNKNOWN_CONSUMER_TAG',
          source: `${this.sourceIdentifier}.getHandlerRefsByProperties`,
        }]);
      }

      return {
        consumerTag: null,
        index:       null,
      };
    }

    throw Reflect.construct(IError, [{
      name:   'MQ_ROUTING_TABLE_UNKNOWN_HANDLER',
      source: `${this.sourceIdentifier}.getHandlerRefsByProperties`,
    }]);
  }
}

consumerTag: null,
index: null,
};
}
}
}

module.exports = {
Expand Down
Loading