Skip to content

Commit b2c89ac

Browse files
committed
test: Add RPC tests.
1 parent 17210f9 commit b2c89ac

File tree

1 file changed

+145
-19
lines changed

1 file changed

+145
-19
lines changed

test/integrationTest.ts

Lines changed: 145 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import chaiJest from 'chai-jest';
44
import pEvent from 'p-event';
55
import { defer, timeout } from 'promise-tools';
66
import amqp from '../src';
7+
import { IAmqpConnectionManager } from '../src/AmqpConnectionManager';
78

89
chai.use(chaiJest);
910

@@ -17,74 +18,69 @@ const { expect } = chai;
1718
*
1819
*/
1920
describe('Integration tests', () => {
20-
beforeEach(() => {
21-
// TODO: Uncomment this if you're using `jest.spyOn()` to restore mocks between tests
22-
// jest.restoreAllMocks();
21+
let connection: IAmqpConnectionManager;
22+
23+
afterEach(async () => {
24+
await connection?.close();
2325
});
2426

2527
it('should connect to the broker', async () => {
2628
// Create a new connection manager
27-
const connection = amqp.connect(['amqp://localhost']);
29+
connection = amqp.connect(['amqp://localhost']);
2830
await timeout(pEvent(connection, 'connect'), 3000);
29-
await connection.close();
3031
});
3132

3233
it('should connect to the broker with a username and password', async () => {
3334
// Create a new connection manager
34-
const connection = amqp.connect(['amqp://guest:guest@localhost:5672']);
35+
connection = amqp.connect(['amqp://guest:guest@localhost:5672']);
3536
await timeout(pEvent(connection, 'connect'), 3000);
36-
await connection.close();
3737
});
3838

3939
it('should connect to the broker with a string', async () => {
4040
// Create a new connection manager
41-
const connection = amqp.connect('amqp://guest:guest@localhost:5672');
41+
connection = amqp.connect('amqp://guest:guest@localhost:5672');
4242
await timeout(pEvent(connection, 'connect'), 3000);
43-
await connection.close();
4443
});
4544

4645
it('should connect to the broker with a amqp.Connect object', async () => {
4746
// Create a new connection manager
48-
const connection = amqp.connect({
47+
connection = amqp.connect({
4948
protocol: 'amqp',
5049
hostname: 'localhost',
5150
port: 5672,
5251
vhost: '/',
5352
});
5453
await timeout(pEvent(connection, 'connect'), 3000);
55-
await connection.close();
5654
});
5755

5856
it('should connect to the broker with an url/options object', async () => {
5957
// Create a new connection manager
60-
const connection = amqp.connect({
58+
connection = amqp.connect({
6159
url: 'amqp://guest:guest@localhost:5672',
6260
});
6361
await timeout(pEvent(connection, 'connect'), 3000);
64-
await connection.close();
6562
});
6663

6764
it('should connect to the broker with a string with options', async () => {
6865
// Create a new connection manager
69-
const connection = amqp.connect(
66+
connection = amqp.connect(
7067
'amqp://guest:guest@localhost:5672/%2F?heartbeat=10&channelMax=100'
7168
);
7269
await timeout(pEvent(connection, 'connect'), 3000);
73-
await connection.close();
7470
});
7571

7672
it('send and receive messages', async () => {
7773
const queueName = 'testQueue1';
7874
const content = `hello world - ${Date.now()}`;
7975

8076
// Create a new connection manager
81-
const connection = amqp.connect(['amqp://localhost']);
77+
connection = amqp.connect(['amqp://localhost']);
8278

8379
// Ask the connection manager for a ChannelWrapper. Specify a setup function to
8480
// run every time we reconnect to the broker.
8581
const sendChannel = connection.createChannel({
8682
setup: async (channel: ConfirmChannel) => {
87-
await channel.assertQueue(queueName, { durable: false });
83+
await channel.assertQueue(queueName, { durable: false, autoDelete: true });
8884
},
8985
});
9086

@@ -94,7 +90,7 @@ describe('Integration tests', () => {
9490
setup: async (channel: ConfirmChannel) => {
9591
// `channel` here is a regular amqplib `ConfirmChannel`.
9692
// Note that `this` here is the channelWrapper instance.
97-
await channel.assertQueue(queueName, { durable: false });
93+
await channel.assertQueue(queueName, { durable: false, autoDelete: true });
9894
await channel.consume(
9995
queueName,
10096
(message) => {
@@ -115,13 +111,143 @@ describe('Integration tests', () => {
115111
},
116112
});
117113

114+
await timeout(pEvent(connection, 'connect'), 3000);
115+
118116
await sendChannel.sendToQueue(queueName, content);
119117

120118
const result = await timeout(rxPromise.promise, 3000);
121119
expect(result.content.toString()).to.equal(content);
122120

123121
await sendChannel.close();
124122
await receiveWrapper.close();
125-
await connection.close();
123+
});
124+
125+
it('RPC', async () => {
126+
const queueName = 'testQueueRpc';
127+
128+
// Create a new connection manager
129+
connection = amqp.connect(['amqp://localhost']);
130+
131+
let rpcClientQueueName = '';
132+
133+
const result = defer<string | undefined>();
134+
135+
// Ask the connection manager for a ChannelWrapper. Specify a setup function to
136+
// run every time we reconnect to the broker.
137+
const rpcClient = connection.createChannel({
138+
setup: async (channel: ConfirmChannel) => {
139+
const qok = await channel.assertQueue('', { exclusive: true });
140+
rpcClientQueueName = qok.queue;
141+
142+
await channel.consume(
143+
rpcClientQueueName,
144+
(message) => {
145+
result.resolve(message?.content.toString());
146+
},
147+
{ noAck: true }
148+
);
149+
},
150+
});
151+
152+
const rpcServer = connection.createChannel({
153+
setup: async (channel: ConfirmChannel) => {
154+
await channel.assertQueue(queueName, { durable: false, autoDelete: true });
155+
await channel.prefetch(1);
156+
await channel.consume(
157+
queueName,
158+
(message) => {
159+
if (message) {
160+
channel.sendToQueue(message.properties.replyTo, Buffer.from('world'), {
161+
correlationId: message.properties.correlationId,
162+
});
163+
}
164+
},
165+
{ noAck: true }
166+
);
167+
},
168+
});
169+
170+
await timeout(pEvent(connection, 'connect'), 3000);
171+
await timeout(rpcClient.waitForConnect(), 3000);
172+
await timeout(rpcServer.waitForConnect(), 3000);
173+
174+
// Send message from client to server.
175+
await rpcClient.sendToQueue(queueName, 'hello', {
176+
correlationId: 'test',
177+
replyTo: rpcClientQueueName,
178+
messageId: 'asdkasldk',
179+
});
180+
181+
const reply = await result.promise;
182+
expect(reply).to.equal('world');
183+
184+
await rpcClient.close();
185+
await rpcServer.close();
186+
});
187+
188+
it('direct-reply-to', async () => {
189+
// See https://www.rabbitmq.com/direct-reply-to.html
190+
const rpcClientQueueName = 'amq.rabbitmq.reply-to';
191+
const queueName = 'testQueueRpc';
192+
193+
// Create a new connection manager
194+
connection = amqp.connect(['amqp://localhost']);
195+
196+
const result = defer<string | undefined>();
197+
198+
connection.on('disconnect', ({ err }) => {
199+
if (err) {
200+
console.log(err);
201+
}
202+
});
203+
204+
// Ask the connection manager for a ChannelWrapper. Specify a setup function to
205+
// run every time we reconnect to the broker.
206+
const rpcClient = connection.createChannel({
207+
setup: async (channel: ConfirmChannel) => {
208+
await channel.consume(
209+
rpcClientQueueName,
210+
(message) => {
211+
result.resolve(message?.content.toString());
212+
},
213+
{ noAck: true }
214+
);
215+
},
216+
});
217+
218+
const rpcServer = connection.createChannel({
219+
setup: async (channel: ConfirmChannel) => {
220+
await channel.assertQueue(queueName, { durable: false, autoDelete: true });
221+
await channel.prefetch(1);
222+
await channel.consume(
223+
queueName,
224+
(message) => {
225+
if (message) {
226+
channel.sendToQueue(message.properties.replyTo, Buffer.from('world'), {
227+
correlationId: message.properties.correlationId,
228+
});
229+
}
230+
},
231+
{ noAck: true }
232+
);
233+
},
234+
});
235+
236+
await timeout(pEvent(connection, 'connect'), 3000);
237+
await timeout(rpcServer.waitForConnect(), 3000);
238+
await timeout(rpcClient.waitForConnect(), 3000);
239+
240+
// Send message from client to server.
241+
await rpcClient.sendToQueue(queueName, 'hello', {
242+
correlationId: 'test',
243+
replyTo: rpcClientQueueName,
244+
messageId: 'asdkasldk',
245+
});
246+
247+
const reply = await result.promise;
248+
expect(reply).to.equal('world');
249+
250+
await rpcClient.close();
251+
await rpcServer.close();
126252
});
127253
});

0 commit comments

Comments
 (0)