-
Notifications
You must be signed in to change notification settings - Fork 26
DOCSP-34008: Split large change events #374
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
811ec32
a9c6e48
9904225
b840209
88dcab3
2cdf355
78209e3
7b6e0c7
db64098
5764b75
949557d
a1666a0
9a03ab7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -111,6 +111,7 @@ for only specified change events. Create the pipeline by using the | |
| You can specify the following aggregation stages in the ``pipeline`` parameter: | ||
|
|
||
| - ``$addFields`` | ||
| - ``$changeStreamSplitLargeEvent`` | ||
| - ``$match`` | ||
| - ``$project`` | ||
| - ``$replaceRoot`` | ||
|
|
@@ -119,9 +120,19 @@ You can specify the following aggregation stages in the ``pipeline`` parameter: | |
| - ``$set`` | ||
| - ``$unset`` | ||
|
|
||
| To learn how to build an aggregation pipeline by using the | ||
| ``PipelineDefinitionBuilder`` class, see :ref:`csharp-builders-aggregation` in | ||
| the Operations with Builders guide. | ||
| .. tip:: | ||
|
|
||
| To learn how to build an aggregation pipeline by using the | ||
| ``PipelineDefinitionBuilder`` class, see :ref:`csharp-builders-aggregation` in | ||
| the Operations with Builders guide. | ||
|
|
||
| To learn more about modifying your change stream output, see the | ||
| :manual:`Modify Change Stream Output | ||
| </changeStreams/#modify-change-stream-output>` section in the {+mdb-server+} | ||
| manual. | ||
|
|
||
| Monitor Update Events Example | ||
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||
|
|
||
| The following example uses the ``pipeline`` parameter to open a change stream | ||
| that records only update operations. Select the :guilabel:`Asynchronous` or :guilabel:`Synchronous` tab to see the | ||
|
|
@@ -145,10 +156,64 @@ corresponding code. | |
| :end-before: end-change-stream-pipeline | ||
| :language: csharp | ||
|
|
||
| To learn more about modifying your change stream output, see the | ||
| :manual:`Modify Change Stream Output | ||
| </changeStreams/#modify-change-stream-output>` section in the {+mdb-server+} | ||
| manual. | ||
| Split Large Change Events Example | ||
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||
|
|
||
| If your application generates change events that exceed 16 MB in size, the | ||
| server returns a ``BSONObjectTooLarge`` error. To avoid this error, you can use | ||
| the ``$changeStreamSplitLargeEvent`` pipeline stage to split the events | ||
| into smaller fragments. | ||
|
|
||
| After you receive the change stream event fragments, you can use the | ||
|
||
| following helper methods to reassemble the fragments into a single | ||
| change stream document: | ||
|
|
||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably should mention that fragments reassembling is optional (but probably needed), and the split events can be watched as usual events as they arrive. |
||
| .. tabs:: | ||
|
|
||
| .. tab:: Asynchronous | ||
| :tabid: split-event-helpers-async | ||
|
|
||
| .. literalinclude:: /includes/code-examples/change-streams/change-streams.cs | ||
| :start-after: start-split-event-helpers-async | ||
| :end-before: end-split-event-helpers-async | ||
| :language: csharp | ||
|
|
||
| .. tab:: Synchronous | ||
| :tabid: split-event-helpers-sync | ||
|
|
||
| .. literalinclude:: /includes/code-examples/change-streams/change-streams.cs | ||
| :start-after: start-split-event-helpers-sync | ||
| :end-before: end-split-event-helpers-sync | ||
| :language: csharp | ||
|
|
||
| This example instructs the driver to watch for changes and split | ||
| change events that exceed the 16 MB limit. The code prints the | ||
| change document for each event and calls the preceding helper methods to | ||
| reassemble any event fragments: | ||
|
|
||
| .. tabs:: | ||
|
|
||
| .. tab:: Asynchronous | ||
| :tabid: change-stream-split-async | ||
|
|
||
| .. literalinclude:: /includes/code-examples/change-streams/change-streams.cs | ||
| :start-after: start-split-change-event-async | ||
| :end-before: end-split-change-event-async | ||
| :language: csharp | ||
|
|
||
| .. tab:: Synchronous | ||
| :tabid: change-stream-split-sync | ||
|
|
||
| .. literalinclude:: /includes/code-examples/change-streams/change-streams.cs | ||
| :start-after: start-split-change-event-sync | ||
| :end-before: end-split-change-event-sync | ||
| :language: csharp | ||
|
|
||
| .. tip:: | ||
|
|
||
| To learn more about splitting large change events, see | ||
| :manual:`$changeStreamSplitLargeEvent </reference/operator/aggregation/changeStreamSplitLargeEvent/>` | ||
| in the {+mdb-server+} manual. | ||
|
|
||
| Modify ``Watch()`` Behavior | ||
| --------------------------- | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,7 +42,7 @@ await cursor.ForEachAsync(change => | |
| .Match(change => change.OperationType == ChangeStreamOperationType.Update); | ||
|
|
||
| // Opens a change stream and prints the changes as they're received | ||
| using (var cursor = await _restaurantsCollection.WatchAsync(pipeline)) | ||
| using (var cursor = await collection.WatchAsync(pipeline)) | ||
| { | ||
| await cursor.ForEachAsync(change => | ||
| { | ||
|
|
@@ -56,7 +56,7 @@ await cursor.ForEachAsync(change => | |
| .Match(change => change.OperationType == ChangeStreamOperationType.Update); | ||
|
|
||
| // Opens a change streams and print the changes as they're received | ||
| using (var cursor = _restaurantsCollection.Watch(pipeline)) | ||
| using (var cursor = collection.Watch(pipeline)) | ||
| { | ||
| foreach (var change in cursor.ToEnumerable()) | ||
| { | ||
|
|
@@ -65,6 +65,115 @@ await cursor.ForEachAsync(change => | |
| } | ||
| // end-change-stream-pipeline | ||
|
|
||
| // start-split-event-helpers-sync | ||
| // Fetches the next complete change stream event | ||
| private static ChangeStreamDocument<TDocument> GetNextChangeStreamEvent<TDocument>( | ||
|
||
| IEnumerator<ChangeStreamDocument<TDocument>> changeStreamEnumerator) | ||
| { | ||
| var changeStreamEvent = changeStreamEnumerator.Current; | ||
|
|
||
| // Reassembles change event fragments if the event is split | ||
| if (changeStreamEvent.SplitEvent != null) | ||
| { | ||
| var fragment = changeStreamEvent; | ||
| while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of) | ||
| { | ||
| changeStreamEnumerator.MoveNext(); | ||
| fragment = changeStreamEnumerator.Current; | ||
| MergeFragment(changeStreamEvent, fragment); | ||
| } | ||
| } | ||
| return changeStreamEvent; | ||
| } | ||
|
|
||
| // Merges a fragment into the base event | ||
| private static void MergeFragment<TDocument>( | ||
| ChangeStreamDocument<TDocument> changeStreamEvent, | ||
| ChangeStreamDocument<TDocument> fragment) | ||
| { | ||
| foreach (var element in fragment.BackingDocument) | ||
| { | ||
| if (element.Name != "_id" && element.Name != "splitEvent") | ||
| { | ||
| changeStreamEvent.BackingDocument[element.Name] = element.Value; | ||
| } | ||
| } | ||
| } | ||
| // end-split-event-helpers-sync | ||
|
|
||
| // start-split-event-helpers-async | ||
| // Fetches the next complete change stream event | ||
| private static async Task<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventAsync<TDocument>( | ||
| IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor) | ||
| { | ||
| var changeStreamEvent = changeStreamCursor.Current.First(); | ||
|
||
|
|
||
| // Reassembles change event fragments if the event is split | ||
| if (changeStreamEvent.SplitEvent != null) | ||
| { | ||
| var fragment = changeStreamEvent; | ||
| while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of) | ||
| { | ||
| if (!await changeStreamCursor.MoveNextAsync()) | ||
| { | ||
| throw new InvalidOperationException("Incomplete split event fragments."); | ||
| } | ||
| fragment = changeStreamCursor.Current.First(); | ||
| MergeFragment(changeStreamEvent, fragment); | ||
| } | ||
| } | ||
| return changeStreamEvent; | ||
| } | ||
|
|
||
| // Merges a fragment into the base event | ||
| private static void MergeFragment<TDocument>( | ||
| ChangeStreamDocument<TDocument> changeStreamEvent, | ||
| ChangeStreamDocument<TDocument> fragment) | ||
| { | ||
| foreach (var element in fragment.BackingDocument) | ||
| { | ||
| if (element.Name != "_id" && element.Name != "splitEvent") | ||
| { | ||
| changeStreamEvent.BackingDocument[element.Name] = element.Value; | ||
| } | ||
| } | ||
| } | ||
| // end-split-event-helpers-async | ||
|
|
||
| // start-split-change-event-async | ||
| var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() | ||
| .ChangeStreamSplitLargeEvent(); | ||
|
|
||
| using (var cursor = await collection.WatchAsync(pipeline)) | ||
| { | ||
| while (await cursor.MoveNextAsync()) | ||
| { | ||
| foreach (var changeStreamEvent in cursor.Current) | ||
| { | ||
| var completeEvent = await GetNextChangeStreamEventAsync(cursor); | ||
| Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); | ||
| } | ||
| } | ||
| } | ||
| // end-split-change-event-async | ||
|
|
||
| // start-split-change-event-sync | ||
| var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() | ||
| .ChangeStreamSplitLargeEvent(); | ||
|
|
||
| using (var cursor = collection.Watch(pipeline)) | ||
| { | ||
| using (var enumerator = cursor.ToEnumerable().GetEnumerator()) | ||
|
||
| { | ||
| while (enumerator.MoveNext()) | ||
| { | ||
| var completeEvent = GetNextChangeStreamEvent(enumerator); | ||
| Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); | ||
| } | ||
| } | ||
| } | ||
| // end-split-change-event-sync | ||
|
|
||
| // start-change-stream-post-image | ||
| var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>() | ||
| .Match(change => change.OperationType == ChangeStreamOperationType.Update); | ||
|
|
@@ -74,7 +183,7 @@ await cursor.ForEachAsync(change => | |
| FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, | ||
| }; | ||
|
|
||
| using (var cursor = _restaurantsCollection.Watch(pipeline, options)) | ||
| using (var cursor = collection.Watch(pipeline, options)) | ||
| { | ||
| foreach (var change in cursor.ToEnumerable()) | ||
| { | ||
|
|
@@ -92,7 +201,7 @@ await cursor.ForEachAsync(change => | |
| FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, | ||
| }; | ||
|
|
||
| using var cursor = await _restaurantsCollection.WatchAsync(pipeline, options); | ||
| using var cursor = await collection.WatchAsync(pipeline, options); | ||
| await cursor.ForEachAsync(change => | ||
| { | ||
| Console.WriteLine(change.FullDocument.ToBsonDocument()); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we talk about the
$changeStreamSplitLargeEvent, but we do not explicitly say that we have it as a step in the typed aggregation api, calledChangeStreamSplitLargeEvent. We mention this only in the code at the bottom of this section. I think it would be better to make this clear from the start