From bb45dc377336d7c16e4a39931228bfb3237e5a04 Mon Sep 17 00:00:00 2001 From: Carson Darling Date: Wed, 30 Apr 2014 11:28:56 -0400 Subject: [PATCH 1/2] added auto-reconnect flag to listen. The function signature is now: `cumin.listen(queueName, handler, autoReconnect)`. If `autoReconnect` is a truthy value, then handlers are attached to both the block and the non-blocking client, so when a Redis connection drops, the process won't exit. In addition, when the blocking client reconnects, it restarts listening for items from the queue. I haven't added any tests for this functionality (I didn't want to expose the redis-clients just for testing), but I would be happy to add them if you have a suggested way of doing it. --- cumin.js | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/cumin.js b/cumin.js index b6c5db1..c196cab 100644 --- a/cumin.js +++ b/cumin.js @@ -113,7 +113,7 @@ module.exports = function(port, host, options) { nonBlockingClient.publish("cumin.enqueued", message, done); }, - listen: function(queueName, handler) { + listen: function(queueName, handler, autoReconnect) { if(!queueName) { throw new Error(consolePrefix, "Queue name must be provided. eg. 'emailQueue'."); } @@ -141,6 +141,30 @@ module.exports = function(port, host, options) { } continueListening("cumin." + queueName, handler); + + + if (autoReconnect) { + + blockingClient.on('end', function(err) { + alreadyListening = false; + }) + + blockingClient.on('error', function(err) { + console.warn('Blocking client disconnect.'); + }); + + nonBlockingClient.on('error', function(err) { + console.warn('Non-blocking client disconnect.'); + }); + + blockingClient.on('ready', function() { + if (!alreadyListening) { + alreadyListening = true; + continueListening('cumin.' + queueName, handler); + } + }); + + } } } } From e0e62c92fc227ea663415e39cad13308b3601250 Mon Sep 17 00:00:00 2001 From: Carson Darling Date: Wed, 30 Apr 2014 11:41:07 -0400 Subject: [PATCH 2/2] Changed ordering of arguments to listen. Also added argument shuffle to make sure we didn't break backward compatibility --- cumin.js | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/cumin.js b/cumin.js index c196cab..b8b13ba 100644 --- a/cumin.js +++ b/cumin.js @@ -113,12 +113,19 @@ module.exports = function(port, host, options) { nonBlockingClient.publish("cumin.enqueued", message, done); }, - listen: function(queueName, handler, autoReconnect) { + listen: function(queueName, autoReconnect, handler) { + + // Do the argument shuffle + if (!handler) { + handler = autoReconnect; + autoReconnect = false; + } + if(!queueName) { throw new Error(consolePrefix, "Queue name must be provided. eg. 'emailQueue'."); } - if(!handler) { + if(!handler || typeof handler !== 'function') { throw new Error(consolePrefix, "You must provide a hander to .listen."); }