Skip to content

Commit 249547b

Browse files
committed
fix(NODE-4763): cache resume token in ChangeStream#tryNext() method
1 parent cd4f6c0 commit 249547b

File tree

3 files changed

+86
-11
lines changed

3 files changed

+86
-11
lines changed

src/change_stream.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -809,7 +809,11 @@ export class ChangeStream<
809809
while (true) {
810810
try {
811811
const change = await this.cursor.tryNext();
812-
return change ?? null;
812+
if (!change) {
813+
return null;
814+
}
815+
const processedChange = this._processChange(change);
816+
return processedChange;
813817
} catch (error) {
814818
try {
815819
await this._processErrorIteratorMode(error, this.cursor.id != null);

test/integration/change-streams/change_stream.test.ts

Lines changed: 75 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -377,10 +377,7 @@ describe.only('Change Streams', function () {
377377

378378
async test() {
379379
await initIteratorMode(changeStream);
380-
collection.insertOne({ a: 1 });
381-
382-
const hasNext = await changeStream.hasNext();
383-
expect(hasNext).to.be.true;
380+
await collection.insertOne({ a: 1 });
384381

385382
const change = await changeStream.next();
386383
expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken);
@@ -392,10 +389,7 @@ describe.only('Change Streams', function () {
392389

393390
async test() {
394391
await initIteratorMode(changeStream);
395-
collection.insertOne({ a: 1 });
396-
397-
const hasNext = await changeStream.hasNext();
398-
expect(hasNext).to.be.true;
392+
await collection.insertOne({ a: 1 });
399393

400394
const change = await changeStream.tryNext();
401395
expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken);
@@ -408,7 +402,7 @@ describe.only('Change Streams', function () {
408402
async test() {
409403
const willBeChange = once(changeStream, 'change');
410404
await once(changeStream.cursor, 'init');
411-
collection.insertOne({ a: 1 });
405+
await collection.insertOne({ a: 1 });
412406

413407
const [change] = await willBeChange;
414408
expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken);
@@ -1835,6 +1829,78 @@ describe.only('Change Streams', function () {
18351829
});
18361830
});
18371831
});
1832+
1833+
describe("NODE-4763 - doesn't produce duplicates after resume", function () {
1834+
const resumableError = { code: 6, message: 'host unreachable' };
1835+
1836+
beforeEach(async function () {
1837+
await client.db('admin').command({
1838+
configureFailPoint: is4_2Server(this.configuration.version)
1839+
? 'failCommand'
1840+
: 'failGetMoreAfterCursorCheckout',
1841+
mode: { skip: 1 },
1842+
data: {
1843+
failCommands: ['getMore'],
1844+
errorCode: resumableError.code,
1845+
errmsg: resumableError.message
1846+
}
1847+
} as FailPoint);
1848+
});
1849+
1850+
afterEach(async function () {
1851+
await client.db('admin').command({
1852+
configureFailPoint: is4_2Server(this.configuration.version)
1853+
? 'failCommand'
1854+
: 'failGetMoreAfterCursorCheckout',
1855+
mode: 'off'
1856+
} as FailPoint);
1857+
});
1858+
1859+
describe('when using iterator form', function () {
1860+
it('#next', { requires: { topology: 'replicaset' } }, async function test() {
1861+
await initIteratorMode(changeStream);
1862+
1863+
await collection.insertOne({ a: 1 });
1864+
const change = await changeStream.next();
1865+
expect(change).to.have.property('operationType', 'insert');
1866+
expect(change).to.have.nested.property('fullDocument.a', 1);
1867+
1868+
await collection.insertOne({ a: 2 });
1869+
const change2 = await changeStream.next();
1870+
expect(change2).to.have.property('operationType', 'insert');
1871+
expect(change2).to.have.nested.property('fullDocument.a', 2);
1872+
});
1873+
1874+
it('#tryNext', { requires: { topology: 'replicaset' } }, async function test() {
1875+
await initIteratorMode(changeStream);
1876+
1877+
await collection.insertOne({ a: 1 });
1878+
const change = await changeStream.tryNext();
1879+
expect(change).to.have.property('operationType', 'insert');
1880+
expect(change).to.have.nested.property('fullDocument.a', 1);
1881+
1882+
await collection.insertOne({ a: 2 });
1883+
const change2 = await changeStream.tryNext();
1884+
expect(change2).to.have.property('operationType', 'insert');
1885+
expect(change2).to.have.nested.property('fullDocument.a', 2);
1886+
});
1887+
});
1888+
1889+
it('in an event listener form', { requires: { topology: 'replicaset' } }, async function () {
1890+
const willBeChange = on(changeStream, 'change');
1891+
await once(changeStream.cursor, 'init');
1892+
1893+
await collection.insertOne({ a: 1 });
1894+
const change = await willBeChange.next();
1895+
expect(change.value[0]).to.have.property('operationType', 'insert');
1896+
expect(change.value[0]).to.have.nested.property('fullDocument.a', 1);
1897+
1898+
await collection.insertOne({ a: 2 });
1899+
const change2 = await willBeChange.next();
1900+
expect(change2.value[0]).to.have.property('operationType', 'insert');
1901+
expect(change2.value[0]).to.have.nested.property('fullDocument.a', 2);
1902+
});
1903+
});
18381904
});
18391905

18401906
describe('ChangeStream resumability', function () {

test/tools/utils.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,12 @@ export function extractAuthFromConnectionString(connectionString: string | any[]
207207

208208
export interface FailPoint {
209209
configureFailPoint: 'failCommand' | 'failGetMoreAfterCursorCheckout' | 'maxTimeNeverTimeOut';
210-
mode: { activationProbability: number } | { times: number } | 'alwaysOn' | 'off';
210+
mode:
211+
| { activationProbability: number }
212+
| { times: number }
213+
| { skip: number }
214+
| 'alwaysOn'
215+
| 'off';
211216
data: {
212217
failCommands: string[];
213218
errorCode?: number;

0 commit comments

Comments
 (0)