Skip to content

Commit d359b36

Browse files
authored
Merge pull request #6 from parse-server-modules/delete-on-sig
Delete subscription on SIGTERM / SIGINT
2 parents e375184 + a7ee56d commit d359b36

File tree

2 files changed

+57
-35
lines changed

2 files changed

+57
-35
lines changed

lib/index.js

Lines changed: 55 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ const namePrefix = 'parse-server';
88
let emitter = new events.EventEmitter();
99

1010
if (!process.env.GOOGLE_APPLICATION_CREDENTIALS) {
11-
throw "set GOOGLE_APPLICATION_CREDENTIALS variable (see https://googlecloudplatform.github.io/google-cloud-node/#/docs/pubsub/0.8.0/guides/authentication)";
11+
throw 'set GOOGLE_APPLICATION_CREDENTIALS variable (see https://googlecloudplatform.github.io/google-cloud-node/#/docs/pubsub/0.8.0/guides/authentication)';
1212
}
1313

1414
if (!process.env.GCLOUD_PROJECT) {
15-
throw "set GCLOUD_PROJECT variable (see https://googlecloudplatform.github.io/google-cloud-node/#/docs/pubsub/0.8.0/guides/authentication)";
15+
throw 'set GCLOUD_PROJECT variable (see https://googlecloudplatform.github.io/google-cloud-node/#/docs/pubsub/0.8.0/guides/authentication)';
1616
}
1717

1818
/**
@@ -26,15 +26,6 @@ class Publisher {
2626

2727
constructor(emitter) {
2828
this.emitter = emitter;
29-
30-
// process.on('SIGNINT',function(){
31-
// process.exit();
32-
// });
33-
34-
// process.on('SIGTERM',function(){
35-
// process.exit();
36-
// });
37-
3829
}
3930

4031
publish(channel, message) {
@@ -53,10 +44,7 @@ class Publisher {
5344
class Subscriber extends events.EventEmitter {
5445

5546
constructor() {
56-
super();
57-
process.on('SIGNINT',function(){
58-
process.exit();
59-
});
47+
super();
6048
}
6149

6250
subscribe(channel) {
@@ -87,31 +75,67 @@ class Subscriber extends events.EventEmitter {
8775
// Here we should get all the subscriptions under the channel topic and unsubscribe them
8876
}
8977

90-
_createSubscription(topic,channel) {
78+
_createSubscription(topic, channel) {
9179

9280
const subscriptionUUID = uuid.v1();
9381
var subscriptionName = `${namePrefix}-${channel}-${subscriptionUUID}`;
9482

9583
topic.subscribe(subscriptionName, (err, subscription) => {
84+
9685
if (err) {
97-
console.log(`Failed to create subscription ${err}`);
86+
console.error(`Failed to create subscription ${err}`);
9887
return;
9988
}
10089

10190
console.log(`Subscription ${subscription.name} created.`);
102-
process.on('SIGTERM', () => {
103-
subscription.delete();
104-
});
105-
process.on('SIGINT', () => {
106-
subscription.delete();
107-
});
108-
subscription.on('message', (message) => {
109-
if (message.ackId) {
110-
message.ack();
91+
92+
function deleteSubscription() {
93+
removeListeners();
94+
console.log('Subscriber: Signal received, deleting subscription');
95+
subscription.delete().then(() => {
96+
console.log('Subscriber: subscription deleted...');
97+
}, (err) => {
98+
console.error(`Subscriber: Error deleting subscription`, err);
99+
});
100+
}
101+
102+
function messageHandler(message) {
103+
if (message.ackId) {
104+
message.ack();
105+
}
106+
let topicName = `${namePrefix}-${channel}`;
107+
this.emit('message', channel , message.data);
108+
};
109+
110+
function errorHandler(err) {
111+
console.error('Subscriber Error: ', err);
112+
// Handle when subscription gets deleted
113+
// Attempt to recreate it
114+
if (err.code == 404) {
115+
console.log('Subscriber: will recreate subscription');
116+
removeListeners();
117+
this._createSubscription(topic, channel);
111118
}
112-
let topicName = `${namePrefix}-${channel}`;
113-
this.emit('message', channel , message.data);
114-
});
119+
}
120+
121+
const onMessage = messageHandler.bind(this);
122+
const onError = errorHandler.bind(this);
123+
124+
// Remove the listenerds
125+
function removeListeners() {
126+
subscription.removeListener('message', onMessage);
127+
subscription.removeListener('error', onError);
128+
process.removeListener('SIGTERM', deleteSubscription);
129+
process.removeListener('SIGINT', deleteSubscription);
130+
}
131+
132+
// Handle termination, delete the subscription (require graceful shutdowm)
133+
process.on('SIGTERM', deleteSubscription);
134+
process.on('SIGINT', deleteSubscription);
135+
136+
// Bind the subscription
137+
subscription.on('message', onMessage);
138+
subscription.on('error', onError);
115139
});
116140
}
117141
}
@@ -124,9 +148,7 @@ function createSubscriber() {
124148
return new Subscriber();
125149
}
126150

127-
let GcpPubSub = {
151+
module.exports = {
128152
createPublisher,
129153
createSubscriber
130-
}
131-
132-
module.exports = GcpPubSub;
154+
};

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
},
2626
"homepage": "https://github.com/parse-server-modules/parse-server-gcloud-pubsub#readme",
2727
"dependencies": {
28-
"@google-cloud/pubsub": "^0.5.0",
29-
"uuid": "^2.0.3"
28+
"@google-cloud/pubsub": "^0.9.0",
29+
"uuid": "^3.0.1"
3030
}
3131
}

0 commit comments

Comments
 (0)