Skip to content

Commit 84261b2

Browse files
[feature] Refactor requests between nodes and add clientRooms method (#146)
This PR refactors the way requests are sent between nodes. The `subJson` client has been removed, the subClient now listens to three channels by default: - the usual channel, where the message are broadcast - the request channel, where any node can request information (for now, the clients in a given rooms or the rooms for a given client) - the response channel, where a node will receive answer to its requests
1 parent c38e323 commit 84261b2

File tree

4 files changed

+266
-79
lines changed

4 files changed

+266
-79
lines changed

README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ The following options are allowed:
3535
- `subEvent`: optional, the redis client event name to subscribe to (`message`)
3636
- `pubClient`: optional, the redis client to publish events on
3737
- `subClient`: optional, the redis client to subscribe to events on
38-
- `clientsTimeout`: optional, after this timeout the adapter will stop waiting from responses to `clients` request (`1000ms`)
38+
- `requestsTimeout`: optional, after this timeout the adapter will stop waiting from responses to request (`1000ms`)
3939

4040
If you decide to supply `pubClient` and `subClient`, make sure you use
4141
[node_redis](https://github.com/mranney/node_redis) as a client or one
@@ -56,12 +56,16 @@ that a regular `Adapter` does not
5656
- `prefix`
5757
- `pubClient`
5858
- `subClient`
59-
- `clientsTimeout`
59+
- `requestsTimeout`
6060

6161
### RedisAdapter#clients(rooms:Array, fn:Function)
6262

6363
Returns the list of client IDs connected to `rooms` across all nodes. See [Namespace#clients(fn:Function)](https://github.com/socketio/socket.io#namespaceclientsfnfunction)
6464

65+
### RedisAdapter#clientRooms(id:String, fn:Function)
66+
67+
Returns the list of rooms the client with the given ID has joined (even on another node).
68+
6569
## Client error handling
6670

6771
Access the `pubClient` and `subClient` properties of the

index.js

Lines changed: 180 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,15 @@ var async = require('async');
1616

1717
module.exports = adapter;
1818

19+
/**
20+
* Request types, for messages between nodes
21+
*/
22+
23+
var requestTypes = {
24+
clients: 0,
25+
clientRooms: 1,
26+
};
27+
1928
/**
2029
* Returns a redis Adapter class.
2130
*
@@ -39,7 +48,7 @@ function adapter(uri, opts){
3948

4049
var prefix = opts.key || 'socket.io';
4150
var subEvent = opts.subEvent || 'message';
42-
var clientsTimeout = opts.clientsTimeout || 1000;
51+
var requestsTimeout = opts.requestsTimeout || 1000;
4352

4453
// init clients if needed
4554
function createClient(redis_opts) {
@@ -50,11 +59,9 @@ function adapter(uri, opts){
5059
return redis(opts.port, opts.host, redis_opts);
5160
}
5261
}
53-
62+
5463
if (!pub) pub = createClient();
5564
if (!sub) sub = createClient({ return_buffers: true });
56-
57-
var subJson = sub.duplicate({ return_buffers: false });
5865

5966
// this server's key
6067
var uid = uid2(6);
@@ -71,10 +78,12 @@ function adapter(uri, opts){
7178

7279
this.uid = uid;
7380
this.prefix = prefix;
74-
this.clientsTimeout = clientsTimeout;
81+
this.requestsTimeout = requestsTimeout;
7582

7683
this.channel = prefix + '#' + nsp.name + '#';
77-
this.syncChannel = prefix + '-sync#request#' + this.nsp.name + '#';
84+
this.requestChannel = prefix + '-request#' + this.nsp.name + '#';
85+
this.responseChannel = prefix + '-response#' + this.nsp.name + '#';
86+
this.requests = {};
7887

7988
if (String.prototype.startsWith) {
8089
this.channelMatches = function (messageChannel, subscribedChannel) {
@@ -90,16 +99,11 @@ function adapter(uri, opts){
9099

91100
var self = this;
92101

93-
sub.subscribe(this.channel, function(err){
94-
if (err) self.emit('error', err);
95-
});
96-
97-
subJson.subscribe(this.syncChannel, function(err){
102+
sub.subscribe([this.channel, this.requestChannel, this.responseChannel], function(err){
98103
if (err) self.emit('error', err);
99104
});
100105

101106
sub.on(subEvent, this.onmessage.bind(this));
102-
subJson.on(subEvent, this.onclients.bind(this));
103107
}
104108

105109
/**
@@ -115,9 +119,16 @@ function adapter(uri, opts){
115119
*/
116120

117121
Redis.prototype.onmessage = function(channel, msg){
118-
if (!this.channelMatches(channel.toString(), this.channel)) {
122+
channel = channel.toString();
123+
124+
if (this.channelMatches(channel, this.requestChannel)) {
125+
return this.onrequest(channel, msg);
126+
} else if (this.channelMatches(channel, this.responseChannel)) {
127+
return this.onresponse(channel, msg);
128+
} else if (!this.channelMatches(channel, this.channel)) {
119129
return debug('ignore different channel');
120130
}
131+
121132
var args = msgpack.decode(msg);
122133
var packet;
123134

@@ -139,40 +150,119 @@ function adapter(uri, opts){
139150
};
140151

141152
/**
142-
* Called with a subscription message on sync
153+
* Called on request from another node
143154
*
144155
* @api private
145156
*/
146157

147-
Redis.prototype.onclients = function(channel, msg){
148-
158+
Redis.prototype.onrequest = function(channel, msg){
149159
var self = this;
160+
var request;
150161

151-
if (!self.channelMatches(channel.toString(), self.syncChannel)) {
152-
return debug('ignore different channel');
162+
try {
163+
request = JSON.parse(msg);
164+
} catch(err){
165+
self.emit('error', err);
166+
return;
153167
}
154168

169+
debug('received request %j', request);
170+
171+
switch (request.type) {
172+
173+
case requestTypes.clients:
174+
Adapter.prototype.clients.call(self, request.rooms, function(err, clients){
175+
if(err){
176+
self.emit('error', err);
177+
return;
178+
}
179+
180+
var response = JSON.stringify({
181+
requestid: request.requestid,
182+
clients: clients
183+
});
184+
185+
pub.publish(self.responseChannel, response);
186+
});
187+
break;
188+
189+
case requestTypes.clientRooms:
190+
Adapter.prototype.clientRooms.call(self, request.sid, function(err, rooms){
191+
if(err){
192+
self.emit('error', err);
193+
return;
194+
}
195+
196+
if (!rooms) { return; }
197+
198+
var response = JSON.stringify({
199+
requestid: request.requestid,
200+
rooms: rooms
201+
});
202+
203+
pub.publish(self.responseChannel, response);
204+
});
205+
break;
206+
207+
default:
208+
debug('ignoring unknown request type: %s', request.type);
209+
}
210+
};
211+
212+
/**
213+
* Called on response from another node
214+
*
215+
* @api private
216+
*/
217+
218+
Redis.prototype.onresponse = function(channel, msg){
219+
var self = this;
220+
var response;
221+
155222
try {
156-
var decoded = JSON.parse(msg);
223+
response = JSON.parse(msg);
157224
} catch(err){
158225
self.emit('error', err);
159226
return;
160227
}
161228

162-
Adapter.prototype.clients.call(self, decoded.rooms, function(err, clients){
163-
if(err){
164-
self.emit('error', err);
165-
return;
166-
}
229+
if (!response.requestid || !self.requests[response.requestid]) {
230+
debug('ignoring unknown request');
231+
return;
232+
}
167233

168-
var responseChn = prefix + '-sync#response#' + decoded.transaction;
169-
var response = JSON.stringify({
170-
clients : clients
171-
});
234+
debug('received response %j', response);
172235

173-
pub.publish(responseChn, response);
174-
});
175-
236+
var request = self.requests[response.requestid];
237+
238+
switch (request.type) {
239+
240+
case requestTypes.clients:
241+
request.msgCount++;
242+
243+
// ignore if response does not contain 'clients' key
244+
if(!response.clients || !Array.isArray(response.clients)) return;
245+
246+
for(var i = 0; i < response.clients.length; i++){
247+
request.clients[response.clients[i]] = true;
248+
}
249+
250+
if (request.msgCount === request.numsub) {
251+
clearTimeout(request.timeout);
252+
if (request.callback) process.nextTick(request.callback.bind(null, null, Object.keys(request.clients)));
253+
delete self.requests[request.requestid];
254+
}
255+
break;
256+
257+
case requestTypes.clientRooms:
258+
clearTimeout(request.timeout);
259+
if (request.callback) process.nextTick(request.callback.bind(null, null, response.rooms));
260+
delete self.requests[request.requestid];
261+
break;
262+
263+
default:
264+
debug('ignoring unknown request type: %s', request.type);
265+
}
176266
};
177267

178268
/**
@@ -292,6 +382,7 @@ function adapter(uri, opts){
292382
* Gets a list of clients by sid.
293383
*
294384
* @param {Array} explicit set of rooms to check.
385+
* @param {Function} callback
295386
* @api public
296387
*/
297388

@@ -304,11 +395,9 @@ function adapter(uri, opts){
304395
rooms = rooms || [];
305396

306397
var self = this;
398+
var requestid = uid2(6);
307399

308-
var transaction = uid2(6);
309-
var responseChn = prefix + '-sync#response#' + transaction;
310-
311-
pub.send_command('pubsub', ['numsub', self.syncChannel], function(err, numsub){
400+
pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){
312401
if (err) {
313402
self.emit('error', err);
314403
if (fn) fn(err);
@@ -317,64 +406,78 @@ function adapter(uri, opts){
317406

318407
numsub = numsub[1];
319408

320-
var msg_count = 0;
321-
var clients = {};
322-
323-
subJson.subscribe(responseChn, function(err) {
324-
if (err) {
325-
self.emit('error', err);
326-
if (fn) fn(err);
327-
return;
328-
}
329-
330-
var request = JSON.stringify({
331-
transaction : transaction,
332-
rooms : rooms
333-
});
334-
335-
/*If there is no response for 1 second, return result;*/
336-
var timeout = setTimeout(function() {
337-
if (fn) process.nextTick(fn.bind(null, null, Object.keys(clients)));
338-
}, self.clientsTimeout);
409+
var request = JSON.stringify({
410+
requestid : requestid,
411+
type: requestTypes.clients,
412+
rooms : rooms
413+
});
339414

340-
subJson.on(subEvent, function onEvent(channel, msg) {
415+
// if there is no response for x second, return result
416+
var timeout = setTimeout(function() {
417+
var request = self.requests[requestid];
418+
if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for clients response'), Object.keys(request.clients)));
419+
delete self.requests[requestid];
420+
}, self.requestsTimeout);
421+
422+
self.requests[requestid] = {
423+
type: requestTypes.clients,
424+
numsub: numsub,
425+
msgCount: 0,
426+
clients: {},
427+
callback: fn,
428+
timeout: timeout
429+
};
430+
431+
pub.publish(self.requestChannel, request);
432+
});
433+
};
341434

342-
if (!self.channelMatches(channel.toString(), responseChn)) {
343-
return debug('ignore different channel');
344-
}
435+
/**
436+
* Gets the list of rooms a given client has joined.
437+
*
438+
* @param {String} client id
439+
* @param {Function} callback
440+
* @api public
441+
*/
345442

346-
var response = JSON.parse(msg);
443+
Redis.prototype.clientRooms = function(id, fn){
347444

348-
//Ignore if response does not contain 'clients' key
349-
if(!response.clients || !Array.isArray(response.clients)) return;
350-
351-
for(var i = 0; i < response.clients.length; i++){
352-
clients[response.clients[i]] = true;
353-
}
445+
var self = this;
446+
var requestid = uid2(6);
354447

355-
msg_count++;
356-
if(msg_count == numsub){
357-
clearTimeout(timeout);
358-
subJson.unsubscribe(responseChn);
359-
subJson.removeListener(subEvent, onEvent);
448+
var rooms = this.sids[id];
360449

361-
if (fn) process.nextTick(fn.bind(null, null, Object.keys(clients)));
362-
}
363-
});
450+
if (rooms) {
451+
if (fn) process.nextTick(fn.bind(null, null, Object.keys(rooms)));
452+
return;
453+
}
364454

365-
pub.publish(self.syncChannel, request);
455+
var request = JSON.stringify({
456+
requestid : requestid,
457+
type: requestTypes.clientRooms,
458+
sid : id
459+
});
366460

367-
});
461+
// if there is no response for x second, return result
462+
var timeout = setTimeout(function() {
463+
if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for rooms response')));
464+
delete self.requests[requestid];
465+
}, self.requestsTimeout);
368466

369-
});
467+
self.requests[requestid] = {
468+
type: requestTypes.clientRooms,
469+
callback: fn,
470+
timeout: timeout
471+
};
370472

473+
pub.publish(self.requestChannel, request);
371474
};
372475

373476
Redis.uid = uid;
374477
Redis.pubClient = pub;
375478
Redis.subClient = sub;
376479
Redis.prefix = prefix;
377-
Redis.clientsTimeout = clientsTimeout;
480+
Redis.requestsTimeout = requestsTimeout;
378481

379482
return Redis;
380483

0 commit comments

Comments
 (0)