Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,11 @@ export class ChangeStream<
while (true) {
try {
const change = await this.cursor.tryNext();
return change ?? null;
if (!change) {
return null;
}
const processedChange = this._processChange(change);
return processedChange;
} catch (error) {
try {
await this._processErrorIteratorMode(error, this.cursor.id != null);
Expand Down
223 changes: 203 additions & 20 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -370,31 +370,76 @@ describe('Change Streams', function () {
}
);

it('should cache the change stream resume token using iterator form', {
metadata: { requires: { topology: 'replicaset' } },
describe('cache the change stream resume token', () => {
describe('using iterator form', () => {
context('#next', () => {
it('caches the resume token on change', {
metadata: { requires: { topology: 'replicaset' } },

async test() {
await initIteratorMode(changeStream);
collection.insertOne({ a: 1 });
async test() {
await initIteratorMode(changeStream);
await collection.insertOne({ a: 1 });

const hasNext = await changeStream.hasNext();
expect(hasNext).to.be.true;
const change = await changeStream.next();
expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken);
}
});

const change = await changeStream.next();
expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken);
}
});
it('caches the resume token correctly when preceded by #hasNext', {
metadata: { requires: { topology: 'replicaset' } },
async test() {
await initIteratorMode(changeStream);
await collection.insertOne({ a: 1 });

it('should cache the change stream resume token using event listener form', {
metadata: { requires: { topology: 'replicaset' } },
async test() {
const willBeChange = once(changeStream, 'change');
await once(changeStream.cursor, 'init');
collection.insertOne({ a: 1 });
await changeStream.hasNext();

const [change] = await willBeChange;
expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken);
}
const change = await changeStream.next();
expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken);
}
});
});

it('#tryNext', {
metadata: { requires: { topology: 'replicaset' } },

async test() {
await initIteratorMode(changeStream);
await collection.insertOne({ a: 1 });

const change = await changeStream.tryNext();
expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken);
}
});

context('#hasNext', () => {
it('does not cache the resume token', {
metadata: { requires: { topology: 'replicaset' } },
async test() {
await initIteratorMode(changeStream);
const resumeToken = changeStream.resumeToken;

await collection.insertOne({ a: 1 });

const hasNext = await changeStream.hasNext();
expect(hasNext).to.be.true;

expect(changeStream.resumeToken).to.equal(resumeToken);
}
});
});
});

it('should cache using event listener form', {
metadata: { requires: { topology: 'replicaset' } },
async test() {
const willBeChange = once(changeStream, 'change');
await once(changeStream.cursor, 'init');
await collection.insertOne({ a: 1 });

const [change] = await willBeChange;
expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken);
}
});
});

it('should error if resume token projected out of change stream document using iterator', {
Expand Down Expand Up @@ -1816,6 +1861,144 @@ describe('Change Streams', function () {
});
});
});

describe("NODE-4763 - doesn't produce duplicates after resume", function () {
let client: MongoClient;
let collection: Collection;
let changeStream: ChangeStream;
let aggregateEvents: CommandStartedEvent[] = [];
const resumableError = { code: 6, message: 'host unreachable' };

beforeEach(async function () {
const dbName = 'node-4763';
const collectionName = 'test-collection';

client = this.configuration.newClient({ monitorCommands: true });
client.on('commandStarted', filterForCommands(['aggregate'], aggregateEvents));
collection = client.db(dbName).collection(collectionName);

changeStream = collection.watch([]);
});

afterEach(async function () {
await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: 'off'
} as FailCommandFailPoint);

await changeStream.close();
await client.close();
aggregateEvents = [];
});

describe('when using iterator form', function () {
it('#next', { requires: { topology: 'replicaset' } }, async function test() {
await initIteratorMode(changeStream);

await collection.insertOne({ a: 1 });
const change = await changeStream.next();
expect(change).to.containSubset({
operationType: 'insert',
fullDocument: { a: 1 }
});

await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: resumableError.code,
errmsg: resumableError.message
}
} as FailCommandFailPoint);

await collection.insertOne({ a: 2 });
const change2 = await changeStream.next();
expect(change2).to.containSubset({
operationType: 'insert',
fullDocument: { a: 2 }
});

expect(aggregateEvents.length).to.equal(2);
});

it('#tryNext', { requires: { topology: 'replicaset' } }, async function test() {
await initIteratorMode(changeStream);

await collection.insertOne({ a: 1 });
const change = await changeStream.tryNext();
expect(change).to.containSubset({
operationType: 'insert',
fullDocument: { a: 1 }
});

await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: resumableError.code,
errmsg: resumableError.message
}
} as FailCommandFailPoint);

await collection.insertOne({ a: 2 });
const change2 = await changeStream.tryNext();
expect(change2).to.containSubset({
operationType: 'insert',
fullDocument: { a: 2 }
});

expect(aggregateEvents.length).to.equal(2);
});
});

it('in an event listener form', { requires: { topology: 'replicaset' } }, async function () {
const willBeChange = on(changeStream, 'change');
await once(changeStream.cursor, 'init');

await collection.insertOne({ a: 1 });
const change = await willBeChange.next();
expect(change.value[0]).to.containSubset({
operationType: 'insert',
fullDocument: { a: 1 }
});

await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: resumableError.code,
errmsg: resumableError.message
}
} as FailCommandFailPoint);

// There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when
// resuming a change stream don't return the change event.
// So we defer the insert until a period of time after the change stream has received the first change.
// 2000ms is long enough for the change stream to attempt to resume and fail once before exhausting the failpoint
// and succeeding.
await sleep(2000);
await collection.insertOne({ a: 2 });

const change2 = await willBeChange.next();
expect(change2.value[0]).to.containSubset({
operationType: 'insert',
fullDocument: { a: 2 }
});

expect(aggregateEvents.length).to.equal(2);
});
});
});

describe('ChangeStream resumability', function () {
Expand Down