diff --git a/lib/beanstalk_worker.coffee b/lib/beanstalk_worker.coffee index 6b6be4f..01728b3 100644 --- a/lib/beanstalk_worker.coffee +++ b/lib/beanstalk_worker.coffee @@ -1,41 +1,38 @@ -events: require 'events' -client: require('beanstalk_client').Client +events = require 'events' +client = require('beanstalk_client').Client class BeanstalkWorker extends events.EventEmitter - constructor: (id, server, handlers, logger) -> - @id: id - @server: server - @handlers: handlers - @logger: logger || console + constructor: (@id, @server, @handlers, @logger) -> + @logger or= console - @stopped: false + @stopped = false start: (tubes, ignore_default) -> @log 'Starting...' - @on 'next', () => + @on 'next', () => #this comes from the events.EventEmitter parent class @handle_next() client.connect @server, (err, conn) => if err @log 'Error connecting: ' + err else - @connection: conn + @connection = conn @watch_tubes tubes, () => - ignored: [] + ignored = [] if ignore_default - ignored.push 'default' + ignored.push 'default' @ignore_tubes ignored, () => - @log 'Started' - @emit 'next' + @log 'Started' + @emit 'next' #emit comes from the events.EventEmitter parent class watch_tubes: (tubes, done) -> - if tubes && (tube: tubes[0]) + if tubes and (tube = tubes[0]) @log 'Watching tube ' + tube @connection.watch tube, (err) => if err @@ -47,7 +44,7 @@ class BeanstalkWorker extends events.EventEmitter ignore_tubes: (tubes, done) -> - if tubes && (tube: tubes[0]) + if tubes and (tube = tubes[0]) @log 'Ignoring tube ' + tube @connection.ignore tube, (err) => if err @@ -60,7 +57,7 @@ class BeanstalkWorker extends events.EventEmitter stop: () -> @log 'Stopping...' - @stopped: true + @stopped = true find_handler: (job_type) -> for handler in @handlers @@ -74,94 +71,106 @@ class BeanstalkWorker extends events.EventEmitter @log 'Stopped' return - @connection.reserve_with_timeout 5, (err, job_id, job_json) => + @connection.reserve_with_timeout(5, (err, job_id, job_json) => if err if 'TIMED_OUT' == err - @emit('next') + @emit 'next' else @log 'Error reserving job : ' + err.toString(); else + job = null + try - job: JSON.parse(job_json) + job = JSON.parse(job_json) catch e - job: null @log 'Error parsing job JSON : ' + job_id + ' : ' + e.toString() if job? - @handle_job(job_id, job) + @handle_job job_id, job else - @log 'Error handling job : ' + job_id + ' : couldn\'t parse job : ' + job_json - @bury_and_emit_next(job_id) + @log "Error handling job : #{job_id} : couldn't parse job : #{job_json}" + @bury_and_emit_next job_id + ) handle_job: (job_id, job) -> - handler: @find_handler job.type + handler = @find_handler job.type if handler? - @run_handler_on_job_data(handler, job_id, job.data) + @run_handler_on_job_data handler, job_id, job.data else - @log 'Error handling job : ' + job_id + ' : no handler for ' + JSON.stringify(job) - @bury_and_emit_next(job_id) + @log "Error handling job : #{job_id} : no handler for #{JSON.stringify(job)}" + @bury_and_emit_next job_id run_handler_on_job_data: (handler, job_id, job_data) -> - start: new Date().getTime() + start = new Date().getTime() try - job_canceled: false + job_canceled = false handler job_data, (action, data) => if !job_canceled - duration: new Date().getTime() - start + duration = new Date().getTime() - start - if !action? || ('next' == action) - @log 'Ran job : ' + job_id + ' in ' + duration + 'ms (' + JSON.stringify(job_data) + ')' - @destroy_and_emit_next(job_id) + #the following will emit next if the handler function doesn't call back with + #anything at all or if the first parameter is = to next + #TODO: what should be done with the data param if it is passed in from handler? + if !action? or ('next' == action) + @log "Ran job : #{job_id} in #{duration} ms (#{JSON.stringify(job_data)})" + @destroy_and_emit_next job_id else if 'release' == action - job_canceled: true - @log('Released job : ' + job_id + ' after ' + duration + 'ms') - @release_and_emit_next(job_id, data) + job_canceled = true + @log "Released job : #{job_id} after #{duration} ms)" + #in the function call to release_and_emit_next below we expect the + #data parameter we are passing through to be a whole number of + #seconds to delay before allowing the job to be retried? + @release_and_emit_next job_id, data else if 'bury' == action - @log('Buried job : ' + job_id) - @bury_and_emit_next(job_id) + @log 'Buried job : ' + job_id + @bury_and_emit_next job_id else - @log('Failed to run job : ' + job_id + ' : ' + reason) + reason = action + @log "Failed to run job : #{job_id} : #{reason}" + @bury_and_emit_next job_id - catch ex - @log 'Exception running job : ' + job_id + ' : ' + ex.toString() - @bury_and_emit_next(job_id) + catch e + @log "Exception running job : #{job_id} : #{e.toString()}" + @bury_and_emit_next job_id bury_and_emit_next: (job_id) -> - @connection.bury job_id, client.LOWEST_PRIORITY, (err) => + @connection.bury(job_id, client.LOWEST_PRIORITY, (err) => if err - @log 'Error burying job : ' + job_id + ' : ' + err.toString() - @emit('next') + @log "Error burying job : #{job_id} : #{err.toString()}" + @emit 'next' + ) release_and_emit_next: (job_id, delay) -> if !delay? - delay: 30 - @connection.release job_id, client.LOWEST_PRIORITY, delay, (err) => + delay = 30 #TODO: is this the number of seconds to wait before job can retry? + @connection.release(job_id, client.LOWEST_PRIORITY, delay, (err) => if err - @log 'Error releasing job : ' + job_id + ' : ' + err.toString() - @emit('next') + @log "Error releasing job : #{job_id} : #{err.toString()}" + @emit 'next' + ) destroy_and_emit_next: (job_id) -> - @connection.destroy job_id, (err) => + @connection.destroy(job_id, (err) => if err - @log 'Error destroying job : ' + job_id + ' : ' + err.toString() - @emit('next') - + @log "Error destroying job : #{job_id} : #{err.toString()}" + @emit 'next' + ) log: (message) -> - @logger.log('[ ' + new Date().toString() + ' ] [ ' + process.pid + ' (' + @id + ') ] : ' + message) + @logger.log "[ #{new Date().toString()} ] [ #{process.pid} ( #{@id} ) ] : #{message}" -exports.BeanstalkWorker: BeanstalkWorker +exports.BeanstalkWorker = BeanstalkWorker diff --git a/lib/beanstalk_worker.js b/lib/beanstalk_worker.js index e08d3c9..d57b25a 100644 --- a/lib/beanstalk_worker.js +++ b/lib/beanstalk_worker.js @@ -1,180 +1,207 @@ -(function(){ - var BeanstalkWorker, client, events; - var __slice = Array.prototype.slice, __bind = function(func, obj, args) { - return function() { - return func.apply(obj || {}, args ? args.concat(__slice.call(arguments, 0)) : arguments); - }; - }, __extends = function(child, parent) { - var ctor = function(){ }; - ctor.prototype = parent.prototype; - child.__superClass__ = parent.prototype; - child.prototype = new ctor(); - child.prototype.constructor = child; - }; +(function() { + var BeanstalkWorker, client, events, + __hasProp = Object.prototype.hasOwnProperty, + __extends = function(child, parent) { for (var key in parent) { if (__hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor; child.__super__ = parent.prototype; return child; }; + events = require('events'); + client = require('beanstalk_client').Client; - BeanstalkWorker = function(id, server, handlers, logger) { - this.id = id; - this.server = server; - this.handlers = handlers; - this.logger = logger || console; - this.stopped = false; - return this; - }; - __extends(BeanstalkWorker, events.EventEmitter); - BeanstalkWorker.prototype.start = function(tubes, ignore_default) { - this.log('Starting...'); - this.on('next', __bind(function() { - return this.handle_next(); - }, this)); - return client.connect(this.server, __bind(function(err, conn) { + + BeanstalkWorker = (function(_super) { + + __extends(BeanstalkWorker, _super); + + function BeanstalkWorker(id, server, handlers, logger) { + this.id = id; + this.server = server; + this.handlers = handlers; + this.logger = logger; + this.logger || (this.logger = console); + this.stopped = false; + } + + BeanstalkWorker.prototype.start = function(tubes, ignore_default) { + var _this = this; + this.log('Starting...'); + this.on('next', function() { + return _this.handle_next(); + }); + return client.connect(this.server, function(err, conn) { if (err) { - return this.log('Error connecting: ' + err); + return _this.log('Error connecting: ' + err); } else { - this.connection = conn; - return this.watch_tubes(tubes, __bind(function() { - var ignored; - ignored = []; - ignore_default ? ignored.push('default') : null; - return this.ignore_tubes(ignored, __bind(function() { - this.log('Started'); - return this.emit('next'); - }, this)); - }, this)); + _this.connection = conn; + return _this.watch_tubes(tubes, function() { + var ignored; + ignored = []; + if (ignore_default) ignored.push('default'); + return _this.ignore_tubes(ignored, function() { + _this.log('Started'); + return _this.emit('next'); + }); + }); } - }, this)); - }; - BeanstalkWorker.prototype.watch_tubes = function(tubes, done) { - var tube; - if (tubes && (tube = tubes[0])) { - this.log('Watching tube ' + tube); - return this.connection.watch(tube, __bind(function(err) { - err ? this.log('Error watching tube : ' + tube) : null; - return this.watch_tubes(tubes.slice(1), done); - }, this)); - } else { - return done(); - } - }; - BeanstalkWorker.prototype.ignore_tubes = function(tubes, done) { - var tube; - if (tubes && (tube = tubes[0])) { - this.log('Ignoring tube ' + tube); - return this.connection.ignore(tube, __bind(function(err) { - err ? this.log('Error ignoring tube : ' + tube) : null; - return this.ignore_tubes(tubes.slice(1), done); - }, this)); - } else { - return done(); - } - }; - BeanstalkWorker.prototype.stop = function() { - this.log('Stopping...'); - this.stopped = true; - return this.stopped; - }; - BeanstalkWorker.prototype.find_handler = function(job_type) { - var _a, _b, _c, handler; - _b = this.handlers; - for (_a = 0, _c = _b.length; _a < _c; _a++) { - handler = _b[_a]; - if (handler[job_type]) { - return handler[job_type]; + }); + }; + + BeanstalkWorker.prototype.watch_tubes = function(tubes, done) { + var tube, + _this = this; + if (tubes && (tube = tubes[0])) { + this.log('Watching tube ' + tube); + return this.connection.watch(tube, function(err) { + if (err) _this.log('Error watching tube : ' + tube); + return _this.watch_tubes(tubes.slice(1), done); + }); + } else { + return done(); + } + }; + + BeanstalkWorker.prototype.ignore_tubes = function(tubes, done) { + var tube, + _this = this; + if (tubes && (tube = tubes[0])) { + this.log('Ignoring tube ' + tube); + return this.connection.ignore(tube, function(err) { + if (err) _this.log('Error ignoring tube : ' + tube); + return _this.ignore_tubes(tubes.slice(1), done); + }); + } else { + return done(); + } + }; + + BeanstalkWorker.prototype.stop = function() { + this.log('Stopping...'); + return this.stopped = true; + }; + + BeanstalkWorker.prototype.find_handler = function(job_type) { + var handler, _i, _len, _ref; + _ref = this.handlers; + for (_i = 0, _len = _ref.length; _i < _len; _i++) { + handler = _ref[_i]; + if (handler[job_type]) return handler[job_type]; } - } - return null; - }; - BeanstalkWorker.prototype.handle_next = function() { - if (this.stopped) { - this.connection.end(); - this.log('Stopped'); return null; - } - return this.connection.reserve_with_timeout(5, __bind(function(err, job_id, job_json) { + }; + + BeanstalkWorker.prototype.handle_next = function() { + var _this = this; + if (this.stopped) { + this.connection.end(); + this.log('Stopped'); + return; + } + return this.connection.reserve_with_timeout(5, function(err, job_id, job_json) { var job; if (err) { if ('TIMED_OUT' === err) { - return this.emit('next'); + return _this.emit('next'); } else { - return this.log('Error reserving job : ' + err.toString()); + return _this.log('Error reserving job : ' + err.toString()); } } else { + job = null; try { job = JSON.parse(job_json); } catch (e) { - job = null; - this.log('Error parsing job JSON : ' + job_id + ' : ' + e.toString()); + _this.log('Error parsing job JSON : ' + job_id + ' : ' + e.toString()); } - if ((typeof job !== "undefined" && job !== null)) { - return this.handle_job(job_id, job); + if (job != null) { + return _this.handle_job(job_id, job); } else { - this.log('Error handling job : ' + job_id + ' : couldn\'t parse job : ' + job_json); - return this.bury_and_emit_next(job_id); + _this.log("Error handling job : " + job_id + " : couldn't parse job : " + job_json); + return _this.bury_and_emit_next(job_id); } } - }, this)); - }; - BeanstalkWorker.prototype.handle_job = function(job_id, job) { - var handler; - handler = this.find_handler(job.type); - if ((typeof handler !== "undefined" && handler !== null)) { - return this.run_handler_on_job_data(handler, job_id, job.data); - } else { - this.log('Error handling job : ' + job_id + ' : no handler for ' + JSON.stringify(job)); - return this.bury_and_emit_next(job_id); - } - }; - BeanstalkWorker.prototype.run_handler_on_job_data = function(handler, job_id, job_data) { - var job_canceled, start; - start = new Date().getTime(); - try { - job_canceled = false; - return handler(job_data, __bind(function(action, data) { - var duration; + }); + }; + + BeanstalkWorker.prototype.handle_job = function(job_id, job) { + var handler; + handler = this.find_handler(job.type); + if (handler != null) { + return this.run_handler_on_job_data(handler, job_id, job.data); + } else { + this.log("Error handling job : " + job_id + " : no handler for " + (JSON.stringify(job))); + return this.bury_and_emit_next(job_id); + } + }; + + BeanstalkWorker.prototype.run_handler_on_job_data = function(handler, job_id, job_data) { + var job_canceled, start, + _this = this; + start = new Date().getTime(); + try { + job_canceled = false; + return handler(job_data, function(action, data) { + var duration, reason; if (!job_canceled) { duration = new Date().getTime() - start; - if (!(typeof action !== "undefined" && action !== null) || ('next' === action)) { - this.log('Ran job : ' + job_id + ' in ' + duration + 'ms (' + JSON.stringify(job_data) + ')'); - return this.destroy_and_emit_next(job_id); + if (!(action != null) || ('next' === action)) { + _this.log("Ran job : " + job_id + " in " + duration + " ms (" + (JSON.stringify(job_data)) + ")"); + return _this.destroy_and_emit_next(job_id); } else if ('release' === action) { job_canceled = true; - this.log('Released job : ' + job_id + ' after ' + duration + 'ms'); - return this.release_and_emit_next(job_id, data); + _this.log("Released job : " + job_id + " after " + duration + " ms)"); + return _this.release_and_emit_next(job_id, data); } else if ('bury' === action) { - this.log('Buried job : ' + job_id); - return this.bury_and_emit_next(job_id); + _this.log('Buried job : ' + job_id); + return _this.bury_and_emit_next(job_id); } else { - return this.log('Failed to run job : ' + job_id + ' : ' + reason); + reason = action; + _this.log("Failed to run job : " + job_id + " : " + reason); + return _this.bury_and_emit_next(job_id); } } - }, this)); - } catch (ex) { - this.log('Exception running job : ' + job_id + ' : ' + ex.toString()); - return this.bury_and_emit_next(job_id); - } - }; - BeanstalkWorker.prototype.bury_and_emit_next = function(job_id) { - return this.connection.bury(job_id, client.LOWEST_PRIORITY, __bind(function(err) { - err ? this.log('Error burying job : ' + job_id + ' : ' + err.toString()) : null; - return this.emit('next'); - }, this)); - }; - BeanstalkWorker.prototype.release_and_emit_next = function(job_id, delay) { - !(typeof delay !== "undefined" && delay !== null) ? (delay = 30) : null; - return this.connection.release(job_id, client.LOWEST_PRIORITY, delay, __bind(function(err) { - err ? this.log('Error releasing job : ' + job_id + ' : ' + err.toString()) : null; - return this.emit('next'); - }, this)); - }; - BeanstalkWorker.prototype.destroy_and_emit_next = function(job_id) { - return this.connection.destroy(job_id, __bind(function(err) { - err ? this.log('Error destroying job : ' + job_id + ' : ' + err.toString()) : null; - return this.emit('next'); - }, this)); - }; - BeanstalkWorker.prototype.log = function(message) { - return this.logger.log('[ ' + new Date().toString() + ' ] [ ' + process.pid + ' (' + this.id + ') ] : ' + message); - }; + }); + } catch (e) { + this.log("Exception running job : " + job_id + " : " + (e.toString())); + return this.bury_and_emit_next(job_id); + } + }; + + BeanstalkWorker.prototype.bury_and_emit_next = function(job_id) { + var _this = this; + return this.connection.bury(job_id, client.LOWEST_PRIORITY, function(err) { + if (err) { + _this.log("Error burying job : " + job_id + " : " + (err.toString())); + } + return _this.emit('next'); + }); + }; + + BeanstalkWorker.prototype.release_and_emit_next = function(job_id, delay) { + var _this = this; + if (!(delay != null)) delay = 30; + return this.connection.release(job_id, client.LOWEST_PRIORITY, delay, function(err) { + if (err) { + _this.log("Error releasing job : " + job_id + " : " + (err.toString())); + } + return _this.emit('next'); + }); + }; + + BeanstalkWorker.prototype.destroy_and_emit_next = function(job_id) { + var _this = this; + return this.connection.destroy(job_id, function(err) { + if (err) { + _this.log("Error destroying job : " + job_id + " : " + (err.toString())); + } + return _this.emit('next'); + }); + }; + + BeanstalkWorker.prototype.log = function(message) { + return this.logger.log("[ " + (new Date().toString()) + " ] [ " + process.pid + " ( " + this.id + " ) ] : " + message); + }; + + return BeanstalkWorker; + + })(events.EventEmitter); exports.BeanstalkWorker = BeanstalkWorker; -})(); + +}).call(this);