diff --git a/CHANGELOG.md b/CHANGELOG.md index e036afa2..0bbeaa6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +# [3.6.0](https://github.com/jwalton/node-amqp-connection-manager/compare/v3.5.2...v3.6.0) (2021-08-27) + +### Features + +- reconnect and cancelAll consumers ([fb0c00b](https://github.com/jwalton/node-amqp-connection-manager/commit/fb0c00becc224ffedd28e810cbb314187d21efdb)) + ## [3.5.2](https://github.com/jwalton/node-amqp-connection-manager/compare/v3.5.1...v3.5.2) (2021-08-26) ### Bug Fixes diff --git a/package.json b/package.json index a84f5714..160a6eae 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "amqp-connection-manager", - "version": "3.5.2", + "version": "3.6.0", "description": "Auto-reconnect and round robin support for amqplib.", "module": "./dist/esm/index.js", "main": "./dist/cjs/index.js", diff --git a/src/AmqpConnectionManager.ts b/src/AmqpConnectionManager.ts index 8eeaf014..cf6ec12d 100644 --- a/src/AmqpConnectionManager.ts +++ b/src/AmqpConnectionManager.ts @@ -1,5 +1,5 @@ import amqp, { Connection } from 'amqplib'; -import { EventEmitter } from 'events'; +import { EventEmitter, once } from 'events'; import { TcpSocketConnectOpts } from 'net'; import pb from 'promise-breaker'; import { ConnectionOptions } from 'tls'; @@ -82,6 +82,8 @@ export interface IAmqpConnectionManager { addListener(event: 'unblocked', listener: () => void): this; addListener(event: 'disconnect', listener: (arg: { err: Error }) => void): this; + listeners(eventName: string | symbol): any; + on(event: string, listener: (...args: any[]) => void): this; on(event: 'connect', listener: ConnectListener): this; on(event: 'blocked', listener: (arg: { reason: string }) => void): this; @@ -108,6 +110,8 @@ export interface IAmqpConnectionManager { removeListener(event: string, listener: (...args: any[]) => void): this; + connect(options?: { timeout?: number }): Promise; + reconnect(): void; createChannel(options?: CreateChannelOpts): ChannelWrapper; close(): Promise; isConnected(): boolean; @@ -196,8 +200,43 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp this.setMaxListeners(0); this._findServers = options.findServers || (() => Promise.resolve(urls)); + } + /** + * Start the connect retries and await the first connect result. Even if the initial connect fails or timeouts, the + * reconnect attempts will continue in the background. + * @param [options={}] - + * @param [options.timeout] - Time to wait for initial connect + */ + async connect({ timeout }: { timeout?: number } = {}): Promise { this._connect(); + + let reject: (reason?: any) => void; + const onDisconnect = ({ err }: { err: any }) => { + // Ignore disconnects caused by dead servers etc., but throw on operational errors like bad credentials. + if (err.isOperational) { + reject(err); + } + }; + + try { + await Promise.race([ + once(this, 'connect'), + new Promise((_resolve, innerReject) => { + reject = innerReject; + this.on('disconnect', onDisconnect); + }), + ...(timeout + ? [ + wait(timeout).promise.then(() => { + throw new Error('amqp-connection-manager: connect timeout'); + }), + ] + : []), + ]); + } finally { + this.removeListener('disconnect', onDisconnect); + } } // `options` here are any options that can be passed to ChannelWrapper. diff --git a/src/ChannelWrapper.ts b/src/ChannelWrapper.ts index 6a050f45..953cab88 100644 --- a/src/ChannelWrapper.ts +++ b/src/ChannelWrapper.ts @@ -44,6 +44,17 @@ interface SendToQueueMessage { reject: (err: Error) => void; } +interface ConsumerOptions extends amqplib.Options.Consume { + prefetch?: number +} + +interface Consumer { + consumerTag: string | null; + queue: string; + onMessage: (msg: amqplib.ConsumeMessage) => void; + options: ConsumerOptions; +} + type Message = PublishMessage | SendToQueueMessage; const IRRECOVERABLE_ERRORS = [ @@ -87,6 +98,8 @@ export default class ChannelWrapper extends EventEmitter { private _unconfirmedMessages: Message[] = []; /** Reason code during publish or sendtoqueue messages. */ private _irrecoverableCode: number | undefined; + /** Consumers which will be reconnected on channel errors etc. */ + private _consumers: Consumer[] = []; /** * The currently connected channel. Note that not all setup functions @@ -324,6 +337,8 @@ export default class ChannelWrapper extends EventEmitter { // Array of setup functions to call. this._setups = []; + this._consumers = []; + if (options.setup) { this._setups.push(options.setup); } @@ -359,10 +374,13 @@ export default class ChannelWrapper extends EventEmitter { this.emit('error', err, { name: this.name }); }) ) - ).then(() => { - this._settingUp = undefined; - }); - + ) + .then(() => { + return Promise.all(this._consumers.map((c) => this._reconnectConsumer(c))); + }) + .then(() => { + this._settingUp = undefined; + }); await this._settingUp; if (!this._channel) { @@ -581,6 +599,89 @@ export default class ChannelWrapper extends EventEmitter { } } + /** + * Setup a consumer + * This consumer will be reconnected on cancellation and channel errors. + */ + async consume( + queue: string, + onMessage: Consumer['onMessage'], + options: ConsumerOptions = {} + ): Promise { + const consumer: Consumer = { + consumerTag: null, + queue, + onMessage, + options, + }; + this._consumers.push(consumer); + await this._consume(consumer); + } + + private async _consume(consumer: Consumer): Promise { + if (!this._channel) { + return; + } + + const { prefetch, ...options } = consumer.options; + if (typeof prefetch === 'number') { + this._channel.prefetch(prefetch, false); + } + + const { consumerTag } = await this._channel.consume( + consumer.queue, + (msg) => { + if (!msg) { + consumer.consumerTag = null; + this._reconnectConsumer(consumer).catch((err) => { + if (err.isOperational && err.message.includes('BasicConsume; 404')) { + // Ignore errors caused by queue not declared. In + // those cases the connection will reconnect and + // then consumers reestablished. The full reconnect + // might be avoided if we assert the queue again + // before starting to consume. + return; + } + throw err; + }); + return; + } + consumer.onMessage(msg); + }, + options + ); + consumer.consumerTag = consumerTag; + } + + private async _reconnectConsumer(consumer: Consumer): Promise { + if (!this._consumers.includes(consumer)) { + // Intentionally canceled + return; + } + await this._consume(consumer); + } + + /** + * Cancel all consumers + */ + async cancelAll(): Promise { + const consumers = this._consumers; + this._consumers = []; + if (!this._channel) { + return; + } + + const channel = this._channel; + await Promise.all( + consumers.reduce((acc, consumer) => { + if (consumer.consumerTag) { + acc.push(channel.cancel(consumer.consumerTag)); + } + return acc; + }, []) + ); + } + /** Send an `ack` to the underlying channel. */ ack(message: amqplib.Message, allUpTo?: boolean): void { this._channel && this._channel.ack(message, allUpTo); diff --git a/src/index.ts b/src/index.ts index 7e32324b..01e7224b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,9 +15,15 @@ export function connect( urls: ConnectionUrl | ConnectionUrl[] | undefined | null, options?: AmqpConnectionManagerOptions ): IAmqpConnectionManager { - return new AmqpConnectionManager(urls, options); + const conn = new AmqpConnectionManager(urls, options); + conn.connect().catch(() => { + /* noop */ + }); + return conn; } +export { AmqpConnectionManager as AmqpConnectionManagerClass }; + const amqp = { connect }; export default amqp; diff --git a/test/AmqpConnectionManagerTest.ts b/test/AmqpConnectionManagerTest.ts index 560a60c7..47dd3300 100644 --- a/test/AmqpConnectionManagerTest.ts +++ b/test/AmqpConnectionManagerTest.ts @@ -27,6 +27,7 @@ describe('AmqpConnectionManager', function () { it('should establish a connection to a broker', async () => { amqp = new AmqpConnectionManager('amqp://localhost'); + amqp.connect(); const [{ connection, url }] = await once(amqp, 'connect'); expect(url, 'url').to.equal('amqp://localhost'); expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5'); @@ -37,6 +38,7 @@ describe('AmqpConnectionManager', function () { protocol: 'amqp', hostname: 'localhost', }); + amqp.connect(); const [{ connection, url }] = await once(amqp, 'connect'); expect(url, 'url').to.eql({ protocol: 'amqp', @@ -51,7 +53,7 @@ describe('AmqpConnectionManager', function () { it('should establish a url object based connection to a broker', async () => { amqp = new AmqpConnectionManager({ url: 'amqp://localhost' }); - + amqp.connect(); const [{ connection, url }] = await once(amqp, 'connect'); expect(url, 'url').to.equal('amqp://localhost'); expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5'); @@ -59,6 +61,7 @@ describe('AmqpConnectionManager', function () { it('should close connection to a broker', async () => { amqp = new AmqpConnectionManager('amqp://localhost'); + amqp.connect(); const [{ connection, url }] = await once(amqp, 'connect'); expect(url, 'url').to.equal('amqp://localhost'); expect((connection as any).url, 'connection.url').to.equal('amqp://localhost?heartbeat=5'); @@ -77,6 +80,7 @@ describe('AmqpConnectionManager', function () { let connected = false; amqp = new AmqpConnectionManager('amqp://localhost'); + amqp.connect(); // Connection should not yet be established expect(amqp.connection, 'current connection').to.equal(undefined); // Connection should be pending though @@ -123,6 +127,7 @@ describe('AmqpConnectionManager', function () { return Promise.resolve('amqp://localhost'); }, }); + amqp.connect(); const [{ connection, url }] = await once(amqp, 'connect'); expect(url, 'url').to.equal('amqp://localhost'); expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5'); @@ -134,6 +139,7 @@ describe('AmqpConnectionManager', function () { return Promise.resolve({ url: 'amqp://localhost' }); }, }); + amqp.connect(); const [{ connection, url }] = await once(amqp, 'connect'); expect(url, 'url').to.equal('amqp://localhost'); expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5'); @@ -145,13 +151,29 @@ describe('AmqpConnectionManager', function () { return Promise.resolve(null); }, }); + amqp.connect(); const [{ err }] = await once(amqp, 'disconnect'); expect(err.message).to.contain('No servers found'); return amqp?.close(); }); + it('should timeout connect', async () => { + jest.spyOn(origAmpq, 'connect').mockImplementation((): any => { + return promiseTools.delay(200); + }); + amqp = new AmqpConnectionManager('amqp://localhost'); + let err; + try { + await amqp.connect({ timeout: 0.1 }); + } catch (error) { + err = error; + } + expect(err.message).to.equal('amqp-connection-manager: connect timeout'); + }); + it('should work with a URL with a query', async () => { amqp = new AmqpConnectionManager('amqp://localhost?frameMax=0x1000'); + amqp.connect(); const [{ connection }] = await once(amqp, 'connect'); expect(connection.url, 'connection.url').to.equal( 'amqp://localhost?frameMax=0x1000&heartbeat=5' @@ -171,6 +193,7 @@ describe('AmqpConnectionManager', function () { amqp = new AmqpConnectionManager(['amqp://rabbit1', 'amqp://rabbit2'], { heartbeatIntervalInSeconds: 0.01, }); + amqp.connect(); let disconnectEventsSeen = 0; amqp.on('disconnect', function () { @@ -196,10 +219,10 @@ describe('AmqpConnectionManager', function () { let disconnectsSeen = 0; amqp.on('disconnect', () => disconnectsSeen++); - await once(amqp, 'connect'); + await amqp.connect(); amqplib.kill(); - await once(amqp, 'connect'); + await amqp.connect(); expect(disconnectsSeen).to.equal(1); }); @@ -211,7 +234,7 @@ describe('AmqpConnectionManager', function () { let disconnectsSeen = 0; amqp.on('disconnect', () => disconnectsSeen++); - await once(amqp, 'connect'); + await amqp.connect(); // Close the connection nicely amqplib.simulateRemoteClose(); @@ -222,6 +245,7 @@ describe('AmqpConnectionManager', function () { it('should know if it is connected or not', async () => { amqp = new AmqpConnectionManager('amqp://localhost'); + amqp.connect(); expect(amqp.isConnected()).to.be.false; @@ -231,7 +255,7 @@ describe('AmqpConnectionManager', function () { it('should be able to manually reconnect', async () => { amqp = new AmqpConnectionManager('amqp://localhost'); - await once(amqp, 'connect'); + await amqp.connect(); amqp.reconnect(); await once(amqp, 'disconnect'); @@ -240,13 +264,14 @@ describe('AmqpConnectionManager', function () { it('should throw on manual reconnect after close', async () => { amqp = new AmqpConnectionManager('amqp://localhost'); - await once(amqp, 'connect'); - await amqp.close() - expect(amqp.reconnect).to.throw() - }) + await amqp.connect(); + await amqp.close(); + expect(amqp.reconnect).to.throw(); + }); it('should create and clean up channel wrappers', async function () { amqp = new AmqpConnectionManager('amqp://localhost'); + await amqp.connect(); const channel = amqp.createChannel({ name: 'test-chan' }); // Channel should register with connection manager @@ -264,6 +289,7 @@ describe('AmqpConnectionManager', function () { it('should clean up channels on close', async function () { amqp = new AmqpConnectionManager('amqp://localhost'); + await amqp.connect(); amqp.createChannel({ name: 'test-chan' }); // Channel should register with connection manager @@ -286,7 +312,7 @@ describe('AmqpConnectionManager', function () { let connectsSeen = 0; amqp.on('connect', () => connectsSeen++); - await once(amqp, 'connect'); + await amqp.connect(); // Close the manager await amqp?.close(); @@ -308,7 +334,7 @@ describe('AmqpConnectionManager', function () { amqp.on('unblocked', () => unblockSeen++); - await once(amqp, 'connect'); + await amqp.connect(); // Close the connection nicely amqplib.simulateRemoteBlock(); amqplib.simulateRemoteUnblock(); diff --git a/test/ChannelWrapperTest.ts b/test/ChannelWrapperTest.ts index 2d7d1ef5..56cea05c 100644 --- a/test/ChannelWrapperTest.ts +++ b/test/ChannelWrapperTest.ts @@ -266,9 +266,7 @@ describe('ChannelWrapper', function () { }); }); - it('should publish messages to the underlying channel with callbacks', function (done: ( - err?: Error - ) => void) { + it('should publish messages to the underlying channel with callbacks', function (done) { connectionManager.simulateConnect(); const channelWrapper = new ChannelWrapper(connectionManager); channelWrapper.waitForConnect(function (err) { @@ -970,6 +968,180 @@ describe('ChannelWrapper', function () { // Final message should have been published to the underlying queue. expect(queue.length).to.equal(2); }); + + it('should consume messages', async function () { + let onMessage: any = null; + + connectionManager.simulateConnect(); + const channelWrapper = new ChannelWrapper(connectionManager, { + async setup(channel: amqplib.ConfirmChannel) { + channel.consume = jest.fn().mockImplementation((_queue, onMsg, _options) => { + onMessage = onMsg; + return Promise.resolve({ consumerTag: 'abc' }); + }); + }, + }); + await channelWrapper.waitForConnect(); + + const messages: any[] = []; + await channelWrapper.consume( + 'queue', + (msg) => { + messages.push(msg); + }, + { noAck: true } + ); + + onMessage(1); + onMessage(2); + onMessage(3); + expect(messages).to.deep.equal([1, 2, 3]); + }); + + it('should reconnect consumer on consumer cancellation', async function () { + let onMessage: any = null; + let consumerTag = 0; + + connectionManager.simulateConnect(); + const channelWrapper = new ChannelWrapper(connectionManager, { + async setup(channel: amqplib.ConfirmChannel) { + channel.consume = jest.fn().mockImplementation((_queue, onMsg, _options) => { + onMessage = onMsg; + return Promise.resolve({ consumerTag: `${consumerTag++}` }); + }); + }, + }); + await channelWrapper.waitForConnect(); + + const messages: any[] = []; + await channelWrapper.consume('queue', (msg) => { + messages.push(msg); + }); + + onMessage(1); + onMessage(null); // simulate consumer cancel + onMessage(2); + onMessage(null); // simulate second cancel + onMessage(3); + + expect(messages).to.deep.equal([1, 2, 3]); + expect(consumerTag).to.equal(3); + }); + + it('should reconnect consumers on channel error', async function () { + let onQueue1: any = null; + let onQueue2: any = null; + let consumerTag = 0; + + // Define a prefetch function here, because it will otherwise be + // unique for each new channel + const prefetchFn = jest + .fn() + .mockImplementation((_prefetch: number, _isGlobal: boolean) => {}); + + connectionManager.simulateConnect(); + const channelWrapper = new ChannelWrapper(connectionManager, { + async setup(channel: amqplib.ConfirmChannel) { + channel.prefetch = prefetchFn; + channel.consume = jest.fn().mockImplementation((queue, onMsg, _options) => { + if (queue === 'queue1') { + onQueue1 = onMsg; + } else { + onQueue2 = onMsg; + } + return Promise.resolve({ consumerTag: `${consumerTag++}` }); + }); + }, + }); + await channelWrapper.waitForConnect(); + + const queue1: any[] = []; + await channelWrapper.consume( + 'queue1', + (msg) => { + queue1.push(msg); + }, + { noAck: true, prefetch: 10 }, + ); + + const queue2: any[] = []; + await channelWrapper.consume('queue2', (msg) => { + queue2.push(msg); + }); + + onQueue1(1); + onQueue2(1); + + connectionManager.simulateDisconnect(); + connectionManager.simulateConnect(); + await channelWrapper.waitForConnect(); + + onQueue1(2); + onQueue2(2); + + expect(queue1).to.deep.equal([1, 2]); + expect(queue2).to.deep.equal([1, 2]); + expect(consumerTag).to.equal(4); + expect(prefetchFn).to.have.beenCalledTimes(2); + expect(prefetchFn).to.have.beenNthCalledWith(1, 10, false); + expect(prefetchFn).to.have.beenNthCalledWith(2, 10, false); + }); + + it('should be able to cancel all consumers', async function () { + let onQueue1: any = null; + let onQueue2: any = null; + let consumerTag = 0; + const canceledTags: number[] = []; + + connectionManager.simulateConnect(); + const channelWrapper = new ChannelWrapper(connectionManager, { + async setup(channel: amqplib.ConfirmChannel) { + channel.consume = jest.fn().mockImplementation((queue, onMsg, _options) => { + if (queue === 'queue1') { + onQueue1 = onMsg; + } else { + onQueue2 = onMsg; + } + return Promise.resolve({ consumerTag: `${consumerTag++}` }); + }); + channel.cancel = jest.fn().mockImplementation((consumerTag) => { + canceledTags.push(consumerTag); + if (consumerTag === '0') { + onQueue1(null); + } else if (consumerTag === '1') { + onQueue2(null); + } + return Promise.resolve(); + }); + }, + }); + await channelWrapper.waitForConnect(); + + const queue1: any[] = []; + await channelWrapper.consume('queue1', (msg) => { + queue1.push(msg); + }); + + const queue2: any[] = []; + await channelWrapper.consume('queue2', (msg) => { + queue2.push(msg); + }); + + onQueue1(1); + onQueue2(1); + + await channelWrapper.cancelAll(); + + // Consumers shouldn't be resumed after reconnect when canceled + connectionManager.simulateDisconnect(); + connectionManager.simulateConnect(); + await channelWrapper.waitForConnect(); + + expect(queue1).to.deep.equal([1]); + expect(queue2).to.deep.equal([1]); + expect(consumerTag).to.equal(2); + expect(canceledTags).to.deep.equal(['0', '1']); + }); }); /** Returns the arguments of the most recent call to this mock. */ diff --git a/test/fixtures.ts b/test/fixtures.ts index 95860680..ff756077 100644 --- a/test/fixtures.ts +++ b/test/fixtures.ts @@ -2,7 +2,7 @@ /* eslint-disable @typescript-eslint/explicit-module-boundary-types */ import { Connection, Message, Options, Replies } from 'amqplib'; -import { EventEmitter } from 'events'; +import { EventEmitter, once } from 'events'; import { IAmqpConnectionManager } from '../src/AmqpConnectionManager'; import ChannelWrapper, { CreateChannelOpts } from '../src/ChannelWrapper'; @@ -146,6 +146,12 @@ export class FakeConfirmChannel extends EventEmitter { close = jest.fn().mockImplementation(async (): Promise => { this.emit('close'); }); + + consume = jest.fn().mockImplementation(async (): Promise => { + return { consumerTag: 'abc' }; + }); + + prefetch = jest.fn().mockImplementation((_prefetch: number, _isGlobal: boolean): void => {}); } export class FakeConnection extends EventEmitter { @@ -188,6 +194,15 @@ export class FakeAmqpConnectionManager extends EventEmitter implements IAmqpConn return 0; } + async connect(): Promise { + await Promise.all([once(this, 'connect'), this.simulateConnect()]); + } + + reconnect(): void { + this.simulateDisconnect(); + this.simulateConnect(); + } + isConnected() { return this.connected; } diff --git a/test/importTest.ts b/test/importTest.ts index 1a354647..d8f60350 100644 --- a/test/importTest.ts +++ b/test/importTest.ts @@ -1,9 +1,13 @@ import { expect } from 'chai'; -import amqp from '../src'; +import amqp, { AmqpConnectionManagerClass as AmqpConnectionManager } from '../src'; describe('import test', function () { it('should let you import as default (#51)', function () { expect(amqp).to.exist; expect(amqp.connect).to.exist; }); + + it('should let you import class', function () { + new AmqpConnectionManager('url'); + }); }); diff --git a/test/integrationTest.ts b/test/integrationTest.ts index ed589b9f..98c787df 100644 --- a/test/integrationTest.ts +++ b/test/integrationTest.ts @@ -3,7 +3,7 @@ import chai from 'chai'; import chaiJest from 'chai-jest'; import pEvent from 'p-event'; import { defer, timeout } from 'promise-tools'; -import amqp from '../src'; +import amqp, { AmqpConnectionManagerClass as AmqpConnectionManager } from '../src'; import { IAmqpConnectionManager } from '../src/AmqpConnectionManager'; chai.use(chaiJest); @@ -69,6 +69,19 @@ describe('Integration tests', () => { await timeout(pEvent(connection, 'connect'), 3000); }); + // This test might cause jest to complain about leaked resources due to the bug described and fixed by: + // https://github.com/squaremo/amqp.node/pull/584 + it('should throw on awaited connect with wrong password', async () => { + connection = new AmqpConnectionManager('amqp://guest:wrong@localhost'); + let err; + try { + await connection.connect(); + } catch (error) { + err = error; + } + expect(err.message).to.contain('ACCESS-REFUSED'); + }); + it('send and receive messages', async () => { const queueName = 'testQueue1'; const content = `hello world - ${Date.now()}`;