Skip to content
This repository was archived by the owner on Jul 3, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,9 @@ reader.on('discard', function(msg){

var writer = nsq.writer(':4150');

writer.on('ready', function() {
writer.publish('events', 'foo');
writer.publish('events', 'bar');
writer.publish('events', 'baz');
});
writer.publish('events', 'foo');
writer.publish('events', 'bar');
writer.publish('events', 'baz');
```

## API
Expand Down Expand Up @@ -137,7 +135,8 @@ Events:

Publish the given `message` to `topic` where `message`
may be a string, buffer, or object. An array of messages
may be passed, in which case a MPUT is performed.
may be passed, in which case a MPUT is performed. It will
wait for a connection to be established.

### writer#close([fn])
Close the writer's connection(s) and fire the optional [fn] when completed.
Expand Down
27 changes: 18 additions & 9 deletions lib/writer.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,27 @@ Writer.prototype.publish = function(topic, msg, fn){
msg = coerce(msg);
}

// connection pool
var size = this.conns.size();
var i = this.n++ % size;
var conn = this.conns.values()[i];
if (!conn) return fn(new Error('no nsqd nodes connected'));

// publish
debug('%s - publish', conn.addr);
if (Array.isArray(msg)) {
conn.mpublish(topic, msg, fn);
if (size) {
// connection pool
var i = this.n++ % size;
var conn = this.conns.values()[i];

// publish
debug('%s - publish', conn.addr);
if (Array.isArray(msg)) {
conn.mpublish(topic, msg, fn);
} else {
conn.publish(topic, msg, fn);
}
} else {
conn.publish(topic, msg, fn);
// wait for ready and retry
var self = this;

this.once('ready', function(){
self.publish(topic, msg, fn)
});
}
};

Expand Down
15 changes: 1 addition & 14 deletions test/acceptance/writer.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ describe('Writer#publish()', function(){
var pub = nsq.writer();
var sub = new Connection;

pub.on('ready', function(){
pub.publish('test', 'something');
});
pub.publish('test', 'something');

sub.on('ready', function(){
sub.subscribe('test', 'tailer');
Expand All @@ -32,17 +30,6 @@ describe('Writer#publish()', function(){
sub.connect();
})

it('should invoke callbacks with errors', function(done){
var pub = nsq.writer({ port: 5000 });

pub.on('error', function(){});

pub.publish('test', 'something', function(err){
err.message.should.equal('no nsqd nodes connected');
done();
});
})

it('should emit "error"', function(done){
var pub = nsq.writer({ port: 5000 });

Expand Down