Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4b7aecc
Initial (simple) batching implementation
SeanFeldman Jul 29, 2018
56c346e
Updating public API
SeanFeldman Jul 29, 2018
19e50df
Modify test to remove reliance on order of messages
SeanFeldman Jul 30, 2018
b03695e
Adding batched send to QueueClient and TopicClient
SeanFeldman Jul 30, 2018
76d127a
Remove order implied by Assert.Collection()
SeanFeldman Jul 31, 2018
369a78f
Validate received messages cannot be added to Batch
SeanFeldman Jul 31, 2018
04f1489
Cleanup
SeanFeldman Jul 31, 2018
156dce4
Enable MessagingEventSource logging to comply with the rest of SendAs…
SeanFeldman Jul 31, 2018
51da1ae
Verify custom properties affect Batch size (serialized bytes)
SeanFeldman Aug 2, 2018
7a60434
Provide an API to create Batch initiated with supported maximum messa…
SeanFeldman Aug 2, 2018
306e57e
Pass messages in Batch through outgoing plugins
SeanFeldman Aug 2, 2018
a0933fe
Report exception via diagnostics
SeanFeldman Aug 24, 2018
3f50da5
Add tracking TODO
SeanFeldman Aug 24, 2018
a7b425d
Rename Batch to MessageBatch
SeanFeldman Oct 4, 2018
8f98ee2
Move extension method into appropreate class
SeanFeldman Oct 4, 2018
8b4e31c
Use correct exception type
SeanFeldman Oct 4, 2018
0aca4b0
Minor tweaks
SeanFeldman Oct 4, 2018
484e303
Ensure properties from the first message needed for a batch are inclu…
SeanFeldman Oct 4, 2018
c02b754
Fix and rename batch test
SeanFeldman Oct 4, 2018
eff85a7
Approving API
SeanFeldman Oct 5, 2018
eabe617
Uncomment log statement
SeanFeldman Oct 26, 2018
6e20439
Resolved conflict from merging v3.4.0
SeanFeldman Mar 21, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ namespace Microsoft.Azure.ServiceBus.Amqp

static class AmqpMessageConverter
{
internal const string PartitionKeyName = "x-opt-partition-key";
internal const string ViaPartitionKeyName = "x-opt-via-partition-key";
const string EnqueuedTimeUtcName = "x-opt-enqueued-time";
const string ScheduledEnqueueTimeUtcName = "x-opt-scheduled-enqueue-time";
const string SequenceNumberName = "x-opt-sequence-number";
const string EnqueueSequenceNumberName = "x-opt-enqueue-sequence-number";
const string LockedUntilName = "x-opt-locked-until";
const string PublisherName = "x-opt-publisher";
const string PartitionKeyName = "x-opt-partition-key";
const string PartitionIdName = "x-opt-partition-id";
const string ViaPartitionKeyName = "x-opt-via-partition-key";
const string DeadLetterSourceName = "x-opt-deadletter-source";
const string TimeSpanName = AmqpConstants.Vendor + ":timespan";
const string UriName = AmqpConstants.Vendor + ":uri";
Expand Down Expand Up @@ -641,7 +641,7 @@ static ArraySegment<byte> StreamToBytes(Stream stream)
return buffer;
}

private static Data ToData(AmqpMessage message)
internal static Data ToData(AmqpMessage message)
{
ArraySegment<byte>[] payload = message.GetPayload();
var buffer = new BufferListStream(payload);
Expand Down
10 changes: 10 additions & 0 deletions src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ public interface ISenderClient : IClientEntity
/// </summary>
Task SendAsync(IList<Message> messageList);

// TODO: extract methods into this interface for the next major version
// /// <summary>
// /// Sends a <see cref="MessageBatch"/> of messages to Service Bus.
// /// </summary>
// Task SendAsync(MessageBatch batch);
// /// <summary>
// /// Create a new <see cref="MessageBatch"/> setting maximum size to the maximum message size allowed by the underlying namespace.
// /// </summary>
// Task<MessageBatch> CreateBatch();

/// <summary>
/// Schedules a message to appear on Service Bus.
/// </summary>
Expand Down
141 changes: 141 additions & 0 deletions src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.Azure.ServiceBus.Core
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Framing;
using Microsoft.Azure.ServiceBus.Amqp;
using Microsoft.Azure.ServiceBus.Diagnostics;

[DebuggerDisplay("{" + nameof(DebuggerDisplay) + ",nq}")]
public class MessageBatch : IDisposable
{
internal readonly ulong maximumBatchSize;
private readonly Func<Message, Task<Message>> pluginsCallback;
private AmqpMessage firstMessage;
private readonly List<Data> datas;
private AmqpMessage result;

/// <summary>
/// Construct a new batch with a maximum batch size and outgoing plugins callback.
/// <remarks>
/// To construct a batch at run-time, use <see cref="MessageSender"/>, <see cref="QueueClient"/>, or <see cref="TopicClient"/>.
/// Use this constructor for testing and custom implementations.
/// </remarks>
/// </summary>
/// <param name="maximumBatchSize">Maximum batch size allowed for batch.</param>
/// <param name="pluginsCallback">Plugins callback to invoke on outgoing messages regisered with batch.</param>
internal MessageBatch(ulong maximumBatchSize, Func<Message, Task<Message>> pluginsCallback)
{
this.maximumBatchSize = maximumBatchSize;
this.pluginsCallback = pluginsCallback;
this.datas = new List<Data>();
this.result = AmqpMessage.Create(datas);
}

/// <summary>
/// Add <see cref="Message"/> to the batch if the overall size of the batch with the added message is not exceeding the batch maximum.
/// </summary>
/// <param name="message"><see cref="Message"/> to add to the batch.</param>
/// <returns></returns>
public async Task<bool> TryAdd(Message message)
{
ThrowIfDisposed();

message.VerifyMessageIsNotPreviouslyReceived();

var processedMessage = await pluginsCallback(message);

var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(processedMessage);

if (firstMessage == null)
{
firstMessage = amqpMessage;

if (processedMessage.MessageId != null)
{
result.Properties.MessageId = processedMessage.MessageId;
}

if (processedMessage.SessionId != null)
{
result.Properties.GroupId = processedMessage.SessionId;
}

if (processedMessage.PartitionKey != null)
{
result.MessageAnnotations.Map[AmqpMessageConverter.PartitionKeyName] = processedMessage.PartitionKey;
}

if (processedMessage.ViaPartitionKey != null)
{
result.MessageAnnotations.Map[AmqpMessageConverter.ViaPartitionKeyName] = processedMessage.ViaPartitionKey;
}
}

var data = AmqpMessageConverter.ToData(amqpMessage);
datas.Add(data);

if (Size <= maximumBatchSize)
{
return true;
}

datas.Remove(data);
return false;

}

/// <summary>
/// Number of messages in batch.
/// </summary>
public int Length => datas.Count;

internal ulong Size => (ulong) result.SerializedMessageSize;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nemakam I'm contemplating to make this public.
Reason: to be able to know what batch size was sent.



/// <summary>
/// Convert batch to AMQP message.
/// </summary>
/// <returns></returns>
internal AmqpMessage ToAmqpMessage()
{
ThrowIfDisposed();

if (datas.Count == 1)
{
firstMessage.Batchable = true;
return firstMessage;
}

result.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat;
result.Batchable = true;
return result;
}

public void Dispose()
{
// TODO: review if there's anything else to do
firstMessage?.Dispose();
result?.Dispose();

firstMessage = null;
result = null;
}

private void ThrowIfDisposed()
{
if (result == null)
{
throw new ObjectDisposedException("MessageBatch has been disposed and cannot be re-used.");
}
}

private string DebuggerDisplay => $"MessageBatch: size={Size}; message count={datas.Count}; maximum size={maximumBatchSize}.";
}
}
102 changes: 82 additions & 20 deletions src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ namespace Microsoft.Azure.ServiceBus.Core
using Microsoft.Azure.Amqp.Framing;
using Microsoft.Azure.ServiceBus.Amqp;
using Microsoft.Azure.ServiceBus.Primitives;
using Microsoft.Azure.ServiceBus.Diagnostics;


/// <summary>
/// The MessageSender can be used to send messages to Queues or Topics.
Expand All @@ -41,6 +43,7 @@ public class MessageSender : ClientEntity, IMessageSender
readonly ServiceBusDiagnosticSource diagnosticSource;
readonly bool isViaSender;
readonly string transferDestinationPath;
private ulong maxMessageSize = 0;

/// <summary>
/// Creates a new AMQP MessageSender.
Expand Down Expand Up @@ -236,12 +239,7 @@ public async Task SendAsync(IList<Message> messageList)
{
this.ThrowIfClosed();

var count = MessageSender.ValidateMessages(messageList);
if (count <= 0)
{
return;
}

var count = MessageSender.VerifyMessagesAreNotPreviouslyReceived(messageList);
MessagingEventSource.Log.MessageSendStart(this.ClientId, count);

bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
Expand All @@ -252,7 +250,7 @@ public async Task SendAsync(IList<Message> messageList)
{
var processedMessages = await this.ProcessMessages(messageList).ConfigureAwait(false);

sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(processedMessages), this.OperationTimeout);
sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(() => AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(processedMessages)), this.OperationTimeout);
await sendTask.ConfigureAwait(false);
}
catch (Exception exception)
Expand All @@ -273,6 +271,78 @@ public async Task SendAsync(IList<Message> messageList)
MessagingEventSource.Log.MessageSendStop(this.ClientId);
}

/// <summary>
/// Sends a <see cref="MessageBatch"/> of messages to Service Bus.
/// </summary>
public async Task SendAsync(MessageBatch messageBatch)
{
this.ThrowIfClosed();

MessagingEventSource.Log.MessageSendStart(this.ClientId, messageBatch.Length);

var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled();
// TODO: diagnostics (Start/Stop) is currently not possible. Requires change in how Diagnostics works.
// See https://github.com/SeanFeldman/azure-service-bus-dotnet/pull/1#issuecomment-415515524 for details.
// var activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(messageList) : null;
Task sendTask;

try
{
sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(messageBatch.ToAmqpMessage), this.OperationTimeout);
await sendTask.ConfigureAwait(false);
}
catch (Exception exception)
{
if (isDiagnosticSourceEnabled)
{
this.diagnosticSource.ReportException(exception);
}

MessagingEventSource.Log.MessageSendException(this.ClientId, exception);
throw;
}
// finally
// {
// this.diagnosticSource.SendStop(activity, messageList, sendTask?.Status);
// }

MessagingEventSource.Log.MessageSendStop(this.ClientId);
}

/// <summary>
/// Create a new <see cref="MessageBatch"/> setting maximum size to the maximum message size allowed by the underlying namespace.
/// </summary>
public async Task<MessageBatch> CreateBatch()
{
if (maxMessageSize != 0)
{
return new MessageBatch(maxMessageSize, ProcessMessage);
}

var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true);
SendingAmqpLink amqpLink = null;
try
{
if (!this.SendLinkManager.TryGetOpenedObject(out amqpLink))
{
amqpLink = await this.SendLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
}

if (!amqpLink.Settings.MaxMessageSize.HasValue)
{
throw new Exception("Broker didn't provide maximum message size. MessageBatch requires maximum message size to operate.");
}

maxMessageSize = amqpLink.Settings.MaxMessageSize.Value;

return new MessageBatch(maxMessageSize, ProcessMessage);
}
catch (Exception exception)
{
throw AmqpExceptionHelper.GetClientException(exception, amqpLink?.GetTrackingId(), null, amqpLink?.Session.IsClosing() ?? false);
}
}

/// <summary>
/// Schedules a message to appear on Service Bus at a later time.
/// </summary>
Expand Down Expand Up @@ -301,7 +371,7 @@ public async Task<long> ScheduleMessageAsync(Message message, DateTimeOffset sch
}

message.ScheduledEnqueueTimeUtc = scheduleEnqueueTimeUtc.UtcDateTime;
MessageSender.ValidateMessage(message);
message.VerifyMessageIsNotPreviouslyReceived();
MessagingEventSource.Log.ScheduleMessageStart(this.ClientId, scheduleEnqueueTimeUtc);
long result = 0;

Expand Down Expand Up @@ -450,7 +520,7 @@ protected override async Task OnClosingAsync()
await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false);
}

static int ValidateMessages(IList<Message> messageList)
static int VerifyMessagesAreNotPreviouslyReceived(IList<Message> messageList)
{
var count = 0;
if (messageList == null)
Expand All @@ -461,20 +531,12 @@ static int ValidateMessages(IList<Message> messageList)
foreach (var message in messageList)
{
count++;
ValidateMessage(message);
message.VerifyMessageIsNotPreviouslyReceived();
}

return count;
}

static void ValidateMessage(Message message)
{
if (message.SystemProperties.IsLockTokenSet)
{
throw Fx.Exception.Argument(nameof(message), "Cannot send a message that was already received.");
}
}

static void CloseSession(SendingAmqpLink link)
{
// Note we close the session (which includes the link).
Expand Down Expand Up @@ -526,10 +588,10 @@ async Task<IList<Message>> ProcessMessages(IList<Message> messageList)
return processedMessageList;
}

async Task OnSendAsync(IList<Message> messageList)
async Task OnSendAsync(Func<AmqpMessage> amqpMessageProvider)
{
var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true);
using (var amqpMessage = AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(messageList))
using (var amqpMessage = amqpMessageProvider())
{
SendingAmqpLink amqpLink = null;
try
Expand Down
Loading