Skip to content

Commit f4b534d

Browse files
committed
Message error handler
1 parent 8f91ed8 commit f4b534d

File tree

9 files changed

+362
-132
lines changed

9 files changed

+362
-132
lines changed

.autover/changes/5bdfbea6-1f68-4d45-b380-4b705ed18fe2.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"Name": "AWS.Messaging",
55
"Type": "Minor",
66
"ChangelogMessages": [
7-
"Added subscriber middleware"
7+
"Added subscriber middleware with optional error handler to override result or retry execution."
88
]
99
}
1010
]

src/AWS.Messaging/Configuration/MessageBusBuilder.cs

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
using System.Collections.Concurrent;
5+
using System.Diagnostics.CodeAnalysis;
6+
using System.Linq.Expressions;
7+
using System.Reflection;
58
using AWS.Messaging.Configuration.Internal;
69
using AWS.Messaging.Publishers;
710
using AWS.Messaging.Publishers.EventBridge;
@@ -12,13 +15,11 @@
1215
using AWS.Messaging.Services.Backoff;
1316
using AWS.Messaging.Services.Backoff.Policies;
1417
using AWS.Messaging.Telemetry;
18+
using Microsoft.Extensions.Configuration;
1519
using Microsoft.Extensions.DependencyInjection;
1620
using Microsoft.Extensions.DependencyInjection.Extensions;
17-
using Microsoft.Extensions.Configuration;
1821
using Microsoft.Extensions.Logging;
1922
using Microsoft.Extensions.Logging.Abstractions;
20-
using System.Diagnostics.CodeAnalysis;
21-
using System.Reflection;
2223

2324
namespace AWS.Messaging.Configuration;
2425

@@ -104,13 +105,20 @@ private IMessageBusBuilder AddPublisher([DynamicallyAccessedMembers(DynamicallyA
104105
return this;
105106
}
106107

107-
private IMessageBusBuilder AddMessageHandler([DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] Type handlerType, Type messageType, Func<MessageEnvelope> envelopeFactory, MethodInfo middlewareInvokeAsyncMethodInfo, string? messageTypeIdentifier = null)
108+
private IMessageBusBuilder AddMessageHandler([DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] Type handlerType, Type messageType, Func<MessageEnvelope> envelopeFactory, HandlerInvokerDelegate handlerInvokerDelegate, string? messageTypeIdentifier = null)
108109
{
109-
var subscriberMapping = new SubscriberMapping(handlerType, messageType, envelopeFactory, middlewareInvokeAsyncMethodInfo, messageTypeIdentifier);
110+
var subscriberMapping = new SubscriberMapping(handlerType, messageType, envelopeFactory, handlerInvokerDelegate, messageTypeIdentifier);
110111
_messageConfiguration.SubscriberMappings.Add(subscriberMapping);
111112
return this;
112113
}
113114

115+
public IMessageBusBuilder AddMessageErrorHandler<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] T>(ServiceLifetime serviceLifetime = ServiceLifetime.Singleton)
116+
where T: IMessageErrorHandler
117+
{
118+
AddAdditionalService(new ServiceDescriptor(typeof(IMessageErrorHandler), typeof(T), serviceLifetime));
119+
return this;
120+
}
121+
114122
public IMessageBusBuilder AddMiddleware<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] TMiddleware>(ServiceLifetime serviceLifetime = ServiceLifetime.Singleton)
115123
where TMiddleware : class, IMiddleware
116124
{
@@ -226,24 +234,20 @@ public IMessageBusBuilder LoadConfigurationFromSettings(IConfiguration configura
226234

227235
if (settings.MessageHandlers != null)
228236
{
229-
// This is not Native AOT compatible but the method in general is marked
230-
// as not being Native AOT compatible due to loading dynamic types. So this
231-
// not being Native AOT compatible is okay.
232-
var middlewareInvokeMethod = typeof(IMiddleware).GetMethod(nameof(IMiddleware.InvokeAsync))!;
233-
234237
foreach (var messageHandler in settings.MessageHandlers)
235238
{
236239
var messageType = GetTypeFromAssemblies(callingAssembly, messageHandler.MessageType)
237240
?? throw new InvalidAppSettingsConfigurationException($"Unable to find the provided message type '{messageHandler.MessageType}'.");
238241
var handlerType = GetTypeFromAssemblies(callingAssembly, messageHandler.HandlerType)
239242
?? throw new InvalidAppSettingsConfigurationException($"Unable to find the provided message handler type '{messageHandler.HandlerType}'.");
240243

244+
var messageEnvelopeType = typeof(MessageEnvelope<>).MakeGenericType(messageType);
245+
241246
// This func is not Native AOT compatible but the method in general is marked
242247
// as not being Native AOT compatible due to loading dynamic types. So this
243248
// func not being Native AOT compatible is okay.
244249
MessageEnvelope envelopeFactory()
245250
{
246-
var messageEnvelopeType = typeof(MessageEnvelope<>).MakeGenericType(messageType);
247251
var envelope = Activator.CreateInstance(messageEnvelopeType);
248252
if (envelope == null || envelope is not MessageEnvelope)
249253
{
@@ -253,12 +257,8 @@ MessageEnvelope envelopeFactory()
253257
return (MessageEnvelope)envelope;
254258
}
255259

256-
// MakeGenericMethod is not Native AOT compatible but the method in general is marked
257-
// as not being Native AOT compatible due to loading dynamic types. So this
258-
// method not being Native AOT compatible is okay.
259-
var middlewareInvokeAsyncMethodInfo = middlewareInvokeMethod.MakeGenericMethod(messageType);
260-
261-
AddMessageHandler(handlerType, messageType, envelopeFactory, middlewareInvokeAsyncMethodInfo, messageHandler.MessageTypeIdentifier);
260+
var handlerInoker = BuildHandlerInvoker(messageType, messageEnvelopeType);
261+
AddMessageHandler(handlerType, messageType, envelopeFactory, handlerInoker, messageHandler.MessageTypeIdentifier);
262262
}
263263
}
264264

@@ -303,6 +303,29 @@ MessageEnvelope envelopeFactory()
303303
}
304304

305305
return this;
306+
307+
// This is not Native AOT compatible but the method in general is marked
308+
// as not being Native AOT compatible due to loading dynamic types. So this
309+
// func not being Native AOT compatible is okay.
310+
static HandlerInvokerDelegate BuildHandlerInvoker(Type messageType, Type messageEnvelopeType)
311+
{
312+
var invokerParam = Expression.Parameter(typeof(HandlerInvoker), "invoker");
313+
var envelopeParam = Expression.Parameter(typeof(MessageEnvelope), "envelope");
314+
var mappingParam = Expression.Parameter(typeof(SubscriberMapping), "mapping");
315+
var tokenParam = Expression.Parameter(typeof(CancellationToken), "token");
316+
317+
// invoker.InvokeAsync<T>( (MessageEnvelope<T>) envelope, mapping, token )
318+
var genericMethodDef = typeof(HandlerInvoker)
319+
.GetMethods(BindingFlags.Public | BindingFlags.Instance)
320+
.First(m => m.Name == nameof(HandlerInvoker.InvokeAsync) && m.IsGenericMethodDefinition)
321+
.GetGenericMethodDefinition();
322+
323+
var closedMethod = genericMethodDef.MakeGenericMethod(messageType);
324+
var typedEnvelope = Expression.Convert(envelopeParam, messageEnvelopeType);
325+
var call = Expression.Call(invokerParam, closedMethod, typedEnvelope, mappingParam, tokenParam);
326+
var lambda = Expression.Lambda<HandlerInvokerDelegate>(call, invokerParam, envelopeParam, mappingParam, tokenParam);
327+
return lambda.Compile();
328+
}
306329
}
307330

308331
[RequiresUnreferencedCode("This method requires loading types dynamically as defined in the configuration system.")]

src/AWS.Messaging/Configuration/SubscriberMapping.cs

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
using System.Diagnostics.CodeAnalysis;
5-
using System.Reflection;
5+
using AWS.Messaging.Services;
66

77
namespace AWS.Messaging.Configuration;
88

@@ -19,7 +19,7 @@ public class SubscriberMapping
1919
public Type MessageType { get; }
2020

2121
/// <inheritdoc/>
22-
public MethodInfo MiddlewareInvokeAsyncMethodInfo { get; }
22+
public HandlerInvokerDelegate HandlerInvoker { get; }
2323

2424
/// <inheritdoc/>
2525
public string MessageTypeIdentifier { get; }
@@ -36,10 +36,10 @@ public class SubscriberMapping
3636
/// <param name="handlerType">The type that implements <see cref="IMessageHandler{T}"/></param>
3737
/// <param name="messageType">The type that will be message data will deserialized into</param>
3838
/// <param name="envelopeFactory">Func for creating <see cref="MessageEnvelope{messageType}"/></param>
39-
/// <param name="middlewareInvokeAsyncMethodInfo"><see cref="MethodInfo"/> to invoke middleware of <see cref="MessageType"/>.</param>
39+
/// <param name="handlerInvoker">Delegate to invoke handler of <see cref="MessageType"/>.</param>
4040
/// <param name="messageTypeIdentifier">Optional message type identifier. If not set the full name of the <see cref="MessageType"/> is used.</param>
4141

42-
internal SubscriberMapping([DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] Type handlerType, Type messageType, Func<MessageEnvelope> envelopeFactory, MethodInfo middlewareInvokeAsyncMethodInfo, string? messageTypeIdentifier = null)
42+
internal SubscriberMapping([DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] Type handlerType, Type messageType, Func<MessageEnvelope> envelopeFactory, HandlerInvokerDelegate handlerInvoker, string? messageTypeIdentifier = null)
4343
{
4444
HandlerType = handlerType;
4545
MessageType = messageType;
@@ -49,7 +49,7 @@ internal SubscriberMapping([DynamicallyAccessedMembers(DynamicallyAccessedMember
4949
messageType.FullName ?? throw new InvalidMessageTypeException("Unable to retrieve the Full Name of the provided Message Type.");
5050

5151
MessageEnvelopeFactory = envelopeFactory;
52-
MiddlewareInvokeAsyncMethodInfo = middlewareInvokeAsyncMethodInfo;
52+
HandlerInvoker = handlerInvoker;
5353
}
5454

5555
/// <summary>
@@ -67,19 +67,13 @@ static MessageEnvelope<TMessage> envelopeFactory()
6767
return new MessageEnvelope<TMessage>();
6868
}
6969

70-
return new SubscriberMapping(typeof(THandler), typeof(TMessage), envelopeFactory, MiddlewareMethodCache<TMessage>.InvokeAsyncMethod, messageTypeIdentifier);
71-
}
72-
73-
private static class MiddlewareMethodCache<T>
74-
{
75-
public static readonly MethodInfo InvokeAsyncMethod = MiddlewareMethod.OpenGenericInvokeAsyncMethod.MakeGenericMethod(typeof(T));
76-
}
70+
static Task<MessageProcessStatus> handlerInvoker(HandlerInvoker invoker, MessageEnvelope messageEnvelope, SubscriberMapping subscriberMapping, CancellationToken token = default)
71+
{
72+
return invoker.InvokeAsync((MessageEnvelope<TMessage>)messageEnvelope, subscriberMapping, token);
73+
}
7774

78-
private static class MiddlewareMethod
79-
{
80-
// resolved once, globally
81-
public static readonly MethodInfo OpenGenericInvokeAsyncMethod = typeof(IMiddleware)
82-
.GetMethods()
83-
.First(m => m.Name == nameof(IMiddleware.InvokeAsync) && m.IsGenericMethod);
75+
return new SubscriberMapping(typeof(THandler), typeof(TMessage), envelopeFactory, handlerInvoker, messageTypeIdentifier);
8476
}
8577
}
78+
79+
public delegate Task<MessageProcessStatus> HandlerInvokerDelegate(HandlerInvoker invoker, MessageEnvelope messageEnvelope, SubscriberMapping subscriberMapping, CancellationToken token = default);
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
namespace AWS.Messaging;
5+
6+
public interface IMessageErrorHandler
7+
{
8+
/// <summary>
9+
/// Handles errors that occur during message processing.
10+
/// </summary>
11+
/// <param name="messageEnvelope">The message being processed.</param>
12+
/// <param name="exception"><see cref="Exception"/> raised while processing message.</param>
13+
/// <param name="attempts">Number of attempts made at processing this message</param>
14+
/// <param name="token"><see cref="CancellationToken"/></param>
15+
/// <returns><see cref="MessageErrorHandlerResponse"/></returns>
16+
public ValueTask<MessageErrorHandlerResponse> OnHandleError<T>(MessageEnvelope<T> messageEnvelope, Exception exception, int attempts, CancellationToken token);
17+
}
18+
19+
public enum MessageErrorHandlerResponse
20+
{
21+
/// <summary>
22+
/// Failed response.
23+
/// </summary>
24+
Failed,
25+
26+
/// <summary>
27+
/// Retry the message processing in the same process.
28+
/// </summary>
29+
Retry,
30+
31+
/// <summary>
32+
/// Success response.
33+
/// </summary>
34+
Success
35+
}

0 commit comments

Comments
 (0)