diff --git a/README.md b/README.md index 3219119..ddab56d 100755 --- a/README.md +++ b/README.md @@ -194,6 +194,27 @@ return q.addJob(job).then((savedJobs) => { ``` +### Reconnection for the change feeds +If `reconnect` options are supplied in the connection during the queue creation, change-feeds will automatically attempt reconnection based on the `maxAttempts`. + +````js +const Queue = require('rethinkdb-job-queue') + +const cxnOptions = { + host: 'localhost', + port: 28015, + db : 'JobQueue', // The name of the database in RethinkDB + + 'reconnect': { // The reconnection options for change feeds + 'pingInterval': 59, // in seconds, same as rethinkdbdash `pingInterval` option + 'maxAttempts': 99999, // maximum no. of times reconnect should be attempted + 'attemptDelay': 5000 // delay factor in milli-seconds between attempts + } + +const q = new Queue(cxnOptions, { name: 'my-queue' }) +```` +Note: This reconnection is applicable only for change-feeds and does not facilitate reconnection for other parts of the package (such as initial db-assertion, job update db writes etc.). + ## About the Owner I, Grant Carthew, am a technologist, trainer, and Dad from Queensland, Australia. I work on code in a number of personal projects and when the need arises I build my own packages. diff --git a/src/db-driver.js b/src/db-driver.js index 62463cd..a9dc026 100755 --- a/src/db-driver.js +++ b/src/db-driver.js @@ -13,7 +13,8 @@ module.exports = function dbDriver (cxn) { cxn.port != null || is.string(cxn.db)) { logger('cxn is an options object') - cxnCopy.silent = true + cxnCopy.pingInterval = cxnCopy.reconnect?.pingInterval || enums.options.reconnect.pingInterval; + cxnCopy.silent = cxnCopy.reconnect?.silent || enums.options.reconnect.silent; cxnCopy.host = cxnCopy.host == null ? enums.options.host : cxnCopy.host cxnCopy.port = cxnCopy.port == null diff --git a/src/enums.js b/src/enums.js index 1c0c4b9..7a74b75 100755 --- a/src/enums.js +++ b/src/enums.js @@ -47,6 +47,7 @@ const options = Object.freeze({ host: 'localhost', port: 28015, db: 'rjqJobQueue', + reconnect: { silent: true, pingInterval: -1, maxAttempts: 0, attemptDelay: 5000 }, queryRunOptions: { readMode: 'majority' }, databaseInitDelay: 1000, masterInterval: 310000, // 5 minutes and 10 seconds diff --git a/src/queue-db.js b/src/queue-db.js index 1d5341b..5017e2c 100755 --- a/src/queue-db.js +++ b/src/queue-db.js @@ -7,6 +7,53 @@ const dbAssert = require('./db-assert') const dbReview = require('./db-review') const queueChange = require('./queue-change') const dbDriver = require('./db-driver') +const { ReqlDriverError, ReqlServerError } = require('rethinkdbdash/lib/error') + +function _isConnectionError(err) { + // Credit: https://github.com/LearnersGuild/rethinkdb-changefeed-reconnect/blob/master/src/index.js#L82 + // FIXME: I'm not terribly happy about this particular logic, but + // unfortunately, rethinkdbdash doesn't provide a consistent error type (or + // even message) when it's having trouble connecting to a changefeed, + // particularly if it is connecting via a rethinkdb proxy server. + return (err instanceof ReqlServerError) || + (err instanceof ReqlDriverError) || + (err.msg && err.msg.match(/Changefeed\saborted/)) || + (err.msg && err.msg.match(/primary\sreplica.*not\savailable/)); +} + +function tryReconnect(q, error, maxAttempts, attemptDelay, nRetryAttempt) { + // if we are detaching (or detached), lets not crash the app with connection errors. + if (q._dbDetached) return error; + // if this is connection error, lets try reconnecting. + if (_isConnectionError(error)) { + // if no further attempts left, throw it as it is. + if (++nRetryAttempt > maxAttempts) + throw error; + // try reconnection after some linear delay. + logger(`connection error, retry after ${nRetryAttempt * attemptDelay / 1000} sec`); + return Promise.resolve().delay(nRetryAttempt * attemptDelay) + .then(() => monitorChangeFeed(q, { maxAttempts, attemptDelay }, nRetryAttempt)); + } + throw error; +} + +function monitorChangeFeed(q, { + maxAttempts = enums.options.reconnect.maxAttempts, + attemptDelay = enums.options.reconnect.attemptDelay } = {}, nRetryAttempt = 0) { + + logger('monitorChangeFeed'); + + return q.r.db(q.db).table(q.name).changes().run(q.queryRunOptions).then(function (changeFeed) { + // we connected successfully, lets reset the counter + nRetryAttempt = 0; + // fetch each change and act on it + q._changeFeedCursor = changeFeed; + return q._changeFeedCursor.each(function (error, change) { + if (error) return tryReconnect(q, error, maxAttempts, attemptDelay, nRetryAttempt); + return queueChange(q, error, change); + }); + }).catch(error => tryReconnect(q, error, maxAttempts, attemptDelay, nRetryAttempt)); +} module.exports.attach = function dbAttach (q, cxn) { logger('attach') @@ -23,16 +70,7 @@ module.exports.attach = function dbAttach (q, cxn) { ].join(':') q._ready = dbAssert(q).then(() => { if (q.changeFeed) { - return q.r.db(q.db) - .table(q.name) - .changes() - .run(q.queryRunOptions) - .then((changeFeed) => { - q._changeFeedCursor = changeFeed - return q._changeFeedCursor.each((err, change) => { - return queueChange(q, err, change) - }) - }) + return monitorChangeFeed(q, cxn.reconnect); } q._changeFeedCursor = false return null @@ -47,12 +85,14 @@ module.exports.attach = function dbAttach (q, cxn) { q.emit(enums.status.ready, q.id) return true }) + q._dbDetached = false; return q._ready } module.exports.detach = function dbDetach (q) { logger('detach') return Promise.resolve().then(() => { + q._dbDetached = true; if (q._changeFeedCursor) { let feed = q._changeFeedCursor q._changeFeedCursor = false