Skip to content

Commit 81c53de

Browse files
authored
Fixes for 2 known cases when observer is closed with Unknown reason (#152)
* Correct observer close reason for partition not found and read session not available cases. CFP should never send Unknown reason to observer close. * Correct observer close reason for "read session not available", for that add ReadSessionNotAvailable to ObserverCloseReason. This error means that the replica that served the request doesn't have request session yet. SDK retries and throws DocumentClientExceptions with status/substatus = 404/1002 when all retries are over. Added new exception for that that is converted into corresponding observer close reason. * Bump version to 2.3.1.
1 parent 560dfbe commit 81c53de

File tree

11 files changed

+130
-13
lines changed

11 files changed

+130
-13
lines changed

src/DocumentDB.ChangeFeedProcessor.UnitTests/Exceptions/PartitionExceptionsTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ namespace Microsoft.Azure.Documents.ChangeFeedProcessor.UnitTests.Exceptions
1212
using System.Threading;
1313
using System.Threading.Tasks;
1414
using Microsoft.Azure.Documents.ChangeFeedProcessor.DataAccess;
15+
using Microsoft.Azure.Documents.ChangeFeedProcessor.DocDBErrors;
1516
using Microsoft.Azure.Documents.ChangeFeedProcessor.Exceptions;
1617
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
1718
using Microsoft.Azure.Documents.ChangeFeedProcessor.UnitTests.Utils;
@@ -119,15 +120,14 @@ public async Task Run_ShouldThrowNotFound_IfPartitionNotFound()
119120
}
120121

121122
[Fact]
122-
public async Task Run_ShouldReThrow_IfUnknownNotFoundSubcode()
123+
public async Task Run_ShouldRethrowReadSessionNotAvailable()
123124
{
124125
Mock.Get(documentQuery)
125126
.SetupSequence(query => query.ExecuteNextAsync<Document>(It.Is<CancellationToken>(token => token == cancellationTokenSource.Token)))
126-
.Throws(DocumentExceptionHelpers.CreateException("Microsoft.Azure.Documents.NotFoundException", 1002))
127-
.ReturnsAsync(feedResponse);
127+
.Throws(DocumentExceptionHelpers.CreateException("Microsoft.Azure.Documents.NotFoundException", (int)SubStatusCode.ReadSessionNotAvailable));
128128

129129
Exception exception = await Record.ExceptionAsync(() => partitionProcessor.RunAsync(cancellationTokenSource.Token));
130-
Assert.IsAssignableFrom<DocumentClientException>(exception);
130+
Assert.IsAssignableFrom<ReadSessionNotAvailableException>(exception);
131131
}
132132

133133
[Fact]

src/DocumentDB.ChangeFeedProcessor.UnitTests/FeedProcessor/PartitionProcessorTests.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ namespace Microsoft.Azure.Documents.ChangeFeedProcessor.UnitTests.FeedProcessor
1212
using Microsoft.Azure.Documents.ChangeFeedProcessor.DataAccess;
1313
using Microsoft.Azure.Documents.ChangeFeedProcessor.Exceptions;
1414
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
15-
using Microsoft.Azure.Documents.ChangeFeedProcessor.Monitoring;
1615
using Microsoft.Azure.Documents.ChangeFeedProcessor.UnitTests.Utils;
1716
using Microsoft.Azure.Documents.Client;
1817
using Moq;
@@ -66,7 +65,12 @@ public PartitionProcessorTests()
6665

6766
observer = Mock.Of<IChangeFeedObserver>();
6867
var checkPointer = new Mock<IPartitionCheckpointer>();
69-
sut = new PartitionProcessor(new ObserverExceptionWrappingChangeFeedObserverDecorator(observer), documentQuery, new ChangeFeedOptions(), processorSettings, checkPointer.Object);
68+
sut = new PartitionProcessor(
69+
new ObserverExceptionWrappingChangeFeedObserverDecorator(observer),
70+
documentQuery,
71+
new ChangeFeedOptions(),
72+
processorSettings,
73+
checkPointer.Object);
7074
}
7175

7276
[Fact]

src/DocumentDB.ChangeFeedProcessor.UnitTests/PartitionManagement/PartitionSupervisorTests.cs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,40 @@ public void Dispose_ShouldWork_WithoutRun()
151151
Assert.Null(exception);
152152
}
153153

154+
[Fact]
155+
public async Task RunObserver_ResourceGoneCloseReason_IfProcessorFailedWithPartitionNotFoundException()
156+
{
157+
Mock.Get(partitionProcessor)
158+
.Setup(processor => processor.RunAsync(It.IsAny<CancellationToken>()))
159+
.ThrowsAsync(new PartitionNotFoundException("processorException", "12345"));
160+
161+
Exception exception = await Record.ExceptionAsync(() => sut.RunAsync(shutdownToken.Token)).ConfigureAwait(false);
162+
Assert.IsType<PartitionNotFoundException>(exception);
163+
Assert.Equal("processorException", exception.Message);
164+
165+
Mock.Get(observer)
166+
.Verify(feedObserver => feedObserver
167+
.CloseAsync(It.Is<ChangeFeedObserverContext>(context => context.PartitionKeyRangeId == lease.PartitionId),
168+
ChangeFeedObserverCloseReason.ResourceGone));
169+
}
170+
171+
[Fact]
172+
public async Task RunObserver_ReadSessionNotAvailableCloseReason_IfProcessorFailedWithReadSessionNotAvailableException()
173+
{
174+
Mock.Get(partitionProcessor)
175+
.Setup(processor => processor.RunAsync(It.IsAny<CancellationToken>()))
176+
.ThrowsAsync(new ReadSessionNotAvailableException("processorException", "12345"));
177+
178+
Exception exception = await Record.ExceptionAsync(() => sut.RunAsync(shutdownToken.Token)).ConfigureAwait(false);
179+
Assert.IsType<ReadSessionNotAvailableException>(exception);
180+
Assert.Equal("processorException", exception.Message);
181+
182+
Mock.Get(observer)
183+
.Verify(feedObserver => feedObserver
184+
.CloseAsync(It.Is<ChangeFeedObserverContext>(context => context.PartitionKeyRangeId == lease.PartitionId),
185+
ChangeFeedObserverCloseReason.ReadSessionNotAvailable));
186+
}
187+
154188
public void Dispose()
155189
{
156190
sut.Dispose();

src/DocumentDB.ChangeFeedProcessor/DocDBErrors/DocDbError.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,6 @@ internal enum DocDbError
1111
PartitionSplit,
1212
TransientError,
1313
MaxItemCountTooLarge,
14+
ReadSessionNotAvailable,
1415
}
1516
}

src/DocumentDB.ChangeFeedProcessor/DocDBErrors/ExceptionClassifier.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ public static DocDbError ClassifyClientException(DocumentClientException clientE
1313
{
1414
SubStatusCode subStatusCode = clientException.GetSubStatusCode();
1515

16-
if (clientException.StatusCode == HttpStatusCode.NotFound && subStatusCode != SubStatusCode.ReadSessionNotAvailable)
17-
return DocDbError.PartitionNotFound;
16+
if (clientException.StatusCode == HttpStatusCode.NotFound)
17+
{
18+
return subStatusCode == SubStatusCode.ReadSessionNotAvailable ? DocDbError.ReadSessionNotAvailable : DocDbError.PartitionNotFound;
19+
}
1820

1921
if (clientException.StatusCode == HttpStatusCode.Gone && (subStatusCode == SubStatusCode.PartitionKeyRangeGone || subStatusCode == SubStatusCode.Splitting))
2022
return DocDbError.PartitionSplit;

src/DocumentDB.ChangeFeedProcessor/DocumentDB.ChangeFeedProcessor.csproj

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<AssemblyName>Microsoft.Azure.Documents.ChangeFeedProcessor</AssemblyName>
2222

2323
<PackageId>Microsoft.Azure.DocumentDB.ChangeFeedProcessor</PackageId>
24-
<PackageVersion>2.3.0</PackageVersion>
24+
<PackageVersion>2.3.1</PackageVersion>
2525
<Title>Microsoft Azure Cosmos DB Change Feed Processor library</Title>
2626
<Authors>Microsoft</Authors>
2727
<PackageLicenseUrl>http://go.microsoft.com/fwlink/?LinkID=509837</PackageLicenseUrl>
@@ -44,9 +44,9 @@
4444
<!--CS1587:XML comment is not placed on a valid language element
4545
LibLog files have misplaced comments, but we cannot touch them.-->
4646
<NoWarn>1587</NoWarn>
47-
<Version>2.3.0</Version>
48-
<AssemblyVersion>2.3.0.0</AssemblyVersion>
49-
<FileVersion>2.3.0.0</FileVersion>
47+
<Version>2.3.1</Version>
48+
<AssemblyVersion>2.3.1.0</AssemblyVersion>
49+
<FileVersion>2.3.1.0</FileVersion>
5050
<PackageReleaseNotes>The change log for this project is available at https://docs.microsoft.com/azure/cosmos-db/sql-api-sdk-dotnet-changefeed.
5151
</PackageReleaseNotes>
5252
</PropertyGroup>
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
//----------------------------------------------------------------
2+
// Copyright (c) Microsoft Corporation. Licensed under the MIT license.
3+
//----------------------------------------------------------------
4+
5+
namespace Microsoft.Azure.Documents.ChangeFeedProcessor.Exceptions
6+
{
7+
using System;
8+
using System.Collections.Generic;
9+
using System.Runtime.Serialization;
10+
using System.Text;
11+
12+
/// <summary>
13+
/// Exception occurred when all retries on StatusCode.NotFound/SubStatusCode.ReadSessionNotAvaialable are over.
14+
/// </summary>
15+
[Serializable]
16+
public class ReadSessionNotAvailableException : PartitionException
17+
{
18+
/// <summary>
19+
/// Initializes a new instance of the <see cref="ReadSessionNotAvailableException"/> class using error message and last continuation token.
20+
/// </summary>
21+
/// <param name="message">The exception error message.</param>
22+
/// <param name="lastContinuation"> Request continuation token.</param>
23+
public ReadSessionNotAvailableException(string message, string lastContinuation)
24+
: base(message, lastContinuation)
25+
{
26+
}
27+
28+
/// <summary>
29+
/// Initializes a new instance of the <see cref="ReadSessionNotAvailableException" /> class using error message and inner exception.
30+
/// </summary>
31+
/// <param name="message">The exception error message.</param>
32+
/// <param name="lastContinuation">The last known continuation token</param>
33+
/// <param name="innerException">The inner exception.</param>
34+
public ReadSessionNotAvailableException(string message, string lastContinuation, Exception innerException)
35+
: base(message, lastContinuation, innerException)
36+
{
37+
}
38+
39+
/// <summary>
40+
/// Initializes a new instance of the <see cref="ReadSessionNotAvailableException" /> class using default values.
41+
/// </summary>
42+
/// <param name="info">The SerializationInfo object that holds serialized object data for the exception being thrown.</param>
43+
/// <param name="context">The StreamingContext that contains contextual information about the source or destination.</param>
44+
protected ReadSessionNotAvailableException(SerializationInfo info, StreamingContext context)
45+
: base(info, context)
46+
{
47+
}
48+
}
49+
}

src/DocumentDB.ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverCloseReason.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,11 @@ public enum ChangeFeedObserverCloseReason
3838
/// The lease is gone. This can be due to partition split.
3939
/// </summary>
4040
LeaseGone,
41+
42+
/// <summary>
43+
/// Indicates a "read session not available" warning related to <see cref="Microsoft.Azure.Documents.ConsistencyLevel.Session"/>.
44+
/// Note: SDK retries on this error.
45+
/// </summary>
46+
ReadSessionNotAvailable,
4147
}
4248
}

src/DocumentDB.ChangeFeedProcessor/FeedProcessing/PartitionProcessor.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public async Task RunAsync(CancellationToken cancellationToken)
4949
{
5050
IFeedResponse<Document> response = await this.query.ExecuteNextAsync<Document>(cancellationToken).ConfigureAwait(false);
5151
lastContinuation = response.ResponseContinuation;
52+
5253
if (response.Count > 0)
5354
{
5455
await this.DispatchChanges(response, cancellationToken).ConfigureAwait(false);
@@ -69,13 +70,20 @@ public async Task RunAsync(CancellationToken cancellationToken)
6970
{
7071
case DocDbError.PartitionNotFound:
7172
throw new PartitionNotFoundException("Partition not found.", lastContinuation);
73+
74+
case DocDbError.ReadSessionNotAvailable:
75+
throw new ReadSessionNotAvailableException("Read session not availalbe.", lastContinuation);
76+
7277
case DocDbError.PartitionSplit:
7378
throw new PartitionSplitException("Partition split.", lastContinuation);
79+
7480
case DocDbError.Undefined:
7581
throw;
82+
7683
case DocDbError.TransientError:
7784
// Retry on transient (429) errors
7885
break;
86+
7987
case DocDbError.MaxItemCountTooLarge:
8088
if (!this.options.MaxItemCount.HasValue)
8189
{
@@ -90,14 +98,17 @@ public async Task RunAsync(CancellationToken cancellationToken)
9098
this.options.MaxItemCount /= 2;
9199
this.logger.WarnFormat("Reducing maxItemCount, new value: {0}.", this.options.MaxItemCount);
92100
break;
101+
93102
default:
94103
this.logger.Fatal($"Unrecognized DocDbError enum value {docDbError}");
95104
Debug.Fail($"Unrecognized DocDbError enum value {docDbError}");
96105
throw;
97106
}
98107

99108
if (clientException.RetryAfter != TimeSpan.Zero)
109+
{
100110
delay = clientException.RetryAfter;
111+
}
101112
}
102113
catch (TaskCanceledException canceledException)
103114
{

src/DocumentDB.ChangeFeedProcessor/PartitionManagement/PartitionSupervisor.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,16 @@ public async Task RunAsync(CancellationToken shutdownToken)
5959
closeReason = ChangeFeedObserverCloseReason.LeaseGone;
6060
throw;
6161
}
62+
catch (PartitionNotFoundException)
63+
{
64+
closeReason = ChangeFeedObserverCloseReason.ResourceGone;
65+
throw;
66+
}
67+
catch (ReadSessionNotAvailableException)
68+
{
69+
closeReason = ChangeFeedObserverCloseReason.ReadSessionNotAvailable;
70+
throw;
71+
}
6272
catch (OperationCanceledException) when (shutdownToken.IsCancellationRequested)
6373
{
6474
closeReason = ChangeFeedObserverCloseReason.Shutdown;

0 commit comments

Comments
 (0)