diff --git a/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs b/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs index 5d23960a..33f0803a 100644 --- a/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs +++ b/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs @@ -17,15 +17,15 @@ namespace Microsoft.Azure.ServiceBus.Amqp static class AmqpMessageConverter { + internal const string PartitionKeyName = "x-opt-partition-key"; + internal const string ViaPartitionKeyName = "x-opt-via-partition-key"; const string EnqueuedTimeUtcName = "x-opt-enqueued-time"; const string ScheduledEnqueueTimeUtcName = "x-opt-scheduled-enqueue-time"; const string SequenceNumberName = "x-opt-sequence-number"; const string EnqueueSequenceNumberName = "x-opt-enqueue-sequence-number"; const string LockedUntilName = "x-opt-locked-until"; const string PublisherName = "x-opt-publisher"; - const string PartitionKeyName = "x-opt-partition-key"; const string PartitionIdName = "x-opt-partition-id"; - const string ViaPartitionKeyName = "x-opt-via-partition-key"; const string DeadLetterSourceName = "x-opt-deadletter-source"; const string TimeSpanName = AmqpConstants.Vendor + ":timespan"; const string UriName = AmqpConstants.Vendor + ":uri"; @@ -641,7 +641,7 @@ static ArraySegment StreamToBytes(Stream stream) return buffer; } - private static Data ToData(AmqpMessage message) + internal static Data ToData(AmqpMessage message) { ArraySegment[] payload = message.GetPayload(); var buffer = new BufferListStream(payload); diff --git a/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs b/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs index aed01c9f..5a939d9f 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs @@ -25,6 +25,16 @@ public interface ISenderClient : IClientEntity /// Task SendAsync(IList messageList); + // TODO: extract methods into this interface for the next major version + // /// + // /// Sends a of messages to Service Bus. + // /// + // Task SendAsync(MessageBatch batch); + // /// + // /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + // /// + // Task CreateBatch(); + /// /// Schedules a message to appear on Service Bus. /// diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs new file mode 100644 index 00000000..377a253c --- /dev/null +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs @@ -0,0 +1,141 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Microsoft.Azure.ServiceBus.Core +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Threading.Tasks; + using Microsoft.Azure.Amqp; + using Microsoft.Azure.Amqp.Framing; + using Microsoft.Azure.ServiceBus.Amqp; + using Microsoft.Azure.ServiceBus.Diagnostics; + + [DebuggerDisplay("{" + nameof(DebuggerDisplay) + ",nq}")] + public class MessageBatch : IDisposable + { + internal readonly ulong maximumBatchSize; + private readonly Func> pluginsCallback; + private AmqpMessage firstMessage; + private readonly List datas; + private AmqpMessage result; + + /// + /// Construct a new batch with a maximum batch size and outgoing plugins callback. + /// + /// To construct a batch at run-time, use , , or . + /// Use this constructor for testing and custom implementations. + /// + /// + /// Maximum batch size allowed for batch. + /// Plugins callback to invoke on outgoing messages regisered with batch. + internal MessageBatch(ulong maximumBatchSize, Func> pluginsCallback) + { + this.maximumBatchSize = maximumBatchSize; + this.pluginsCallback = pluginsCallback; + this.datas = new List(); + this.result = AmqpMessage.Create(datas); + } + + /// + /// Add to the batch if the overall size of the batch with the added message is not exceeding the batch maximum. + /// + /// to add to the batch. + /// + public async Task TryAdd(Message message) + { + ThrowIfDisposed(); + + message.VerifyMessageIsNotPreviouslyReceived(); + + var processedMessage = await pluginsCallback(message); + + var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(processedMessage); + + if (firstMessage == null) + { + firstMessage = amqpMessage; + + if (processedMessage.MessageId != null) + { + result.Properties.MessageId = processedMessage.MessageId; + } + + if (processedMessage.SessionId != null) + { + result.Properties.GroupId = processedMessage.SessionId; + } + + if (processedMessage.PartitionKey != null) + { + result.MessageAnnotations.Map[AmqpMessageConverter.PartitionKeyName] = processedMessage.PartitionKey; + } + + if (processedMessage.ViaPartitionKey != null) + { + result.MessageAnnotations.Map[AmqpMessageConverter.ViaPartitionKeyName] = processedMessage.ViaPartitionKey; + } + } + + var data = AmqpMessageConverter.ToData(amqpMessage); + datas.Add(data); + + if (Size <= maximumBatchSize) + { + return true; + } + + datas.Remove(data); + return false; + + } + + /// + /// Number of messages in batch. + /// + public int Length => datas.Count; + + internal ulong Size => (ulong) result.SerializedMessageSize; + + + /// + /// Convert batch to AMQP message. + /// + /// + internal AmqpMessage ToAmqpMessage() + { + ThrowIfDisposed(); + + if (datas.Count == 1) + { + firstMessage.Batchable = true; + return firstMessage; + } + + result.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat; + result.Batchable = true; + return result; + } + + public void Dispose() + { + // TODO: review if there's anything else to do + firstMessage?.Dispose(); + result?.Dispose(); + + firstMessage = null; + result = null; + } + + private void ThrowIfDisposed() + { + if (result == null) + { + throw new ObjectDisposedException("MessageBatch has been disposed and cannot be re-used."); + } + } + + private string DebuggerDisplay => $"MessageBatch: size={Size}; message count={datas.Count}; maximum size={maximumBatchSize}."; + } +} \ No newline at end of file diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index 64dcf7a0..027ae595 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -15,6 +15,8 @@ namespace Microsoft.Azure.ServiceBus.Core using Microsoft.Azure.Amqp.Framing; using Microsoft.Azure.ServiceBus.Amqp; using Microsoft.Azure.ServiceBus.Primitives; + using Microsoft.Azure.ServiceBus.Diagnostics; + /// /// The MessageSender can be used to send messages to Queues or Topics. @@ -41,6 +43,7 @@ public class MessageSender : ClientEntity, IMessageSender readonly ServiceBusDiagnosticSource diagnosticSource; readonly bool isViaSender; readonly string transferDestinationPath; + private ulong maxMessageSize = 0; /// /// Creates a new AMQP MessageSender. @@ -236,12 +239,7 @@ public async Task SendAsync(IList messageList) { this.ThrowIfClosed(); - var count = MessageSender.ValidateMessages(messageList); - if (count <= 0) - { - return; - } - + var count = MessageSender.VerifyMessagesAreNotPreviouslyReceived(messageList); MessagingEventSource.Log.MessageSendStart(this.ClientId, count); bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); @@ -252,7 +250,7 @@ public async Task SendAsync(IList messageList) { var processedMessages = await this.ProcessMessages(messageList).ConfigureAwait(false); - sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(processedMessages), this.OperationTimeout); + sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(() => AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(processedMessages)), this.OperationTimeout); await sendTask.ConfigureAwait(false); } catch (Exception exception) @@ -273,6 +271,78 @@ public async Task SendAsync(IList messageList) MessagingEventSource.Log.MessageSendStop(this.ClientId); } + /// + /// Sends a of messages to Service Bus. + /// + public async Task SendAsync(MessageBatch messageBatch) + { + this.ThrowIfClosed(); + + MessagingEventSource.Log.MessageSendStart(this.ClientId, messageBatch.Length); + + var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); + // TODO: diagnostics (Start/Stop) is currently not possible. Requires change in how Diagnostics works. + // See https://github.com/SeanFeldman/azure-service-bus-dotnet/pull/1#issuecomment-415515524 for details. + // var activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(messageList) : null; + Task sendTask; + + try + { + sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(messageBatch.ToAmqpMessage), this.OperationTimeout); + await sendTask.ConfigureAwait(false); + } + catch (Exception exception) + { + if (isDiagnosticSourceEnabled) + { + this.diagnosticSource.ReportException(exception); + } + + MessagingEventSource.Log.MessageSendException(this.ClientId, exception); + throw; + } + // finally + // { + // this.diagnosticSource.SendStop(activity, messageList, sendTask?.Status); + // } + + MessagingEventSource.Log.MessageSendStop(this.ClientId); + } + + /// + /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + /// + public async Task CreateBatch() + { + if (maxMessageSize != 0) + { + return new MessageBatch(maxMessageSize, ProcessMessage); + } + + var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true); + SendingAmqpLink amqpLink = null; + try + { + if (!this.SendLinkManager.TryGetOpenedObject(out amqpLink)) + { + amqpLink = await this.SendLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false); + } + + if (!amqpLink.Settings.MaxMessageSize.HasValue) + { + throw new Exception("Broker didn't provide maximum message size. MessageBatch requires maximum message size to operate."); + } + + maxMessageSize = amqpLink.Settings.MaxMessageSize.Value; + + return new MessageBatch(maxMessageSize, ProcessMessage); + } + catch (Exception exception) + { + throw AmqpExceptionHelper.GetClientException(exception, amqpLink?.GetTrackingId(), null, amqpLink?.Session.IsClosing() ?? false); + } + } + /// /// Schedules a message to appear on Service Bus at a later time. /// @@ -301,7 +371,7 @@ public async Task ScheduleMessageAsync(Message message, DateTimeOffset sch } message.ScheduledEnqueueTimeUtc = scheduleEnqueueTimeUtc.UtcDateTime; - MessageSender.ValidateMessage(message); + message.VerifyMessageIsNotPreviouslyReceived(); MessagingEventSource.Log.ScheduleMessageStart(this.ClientId, scheduleEnqueueTimeUtc); long result = 0; @@ -450,7 +520,7 @@ protected override async Task OnClosingAsync() await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false); } - static int ValidateMessages(IList messageList) + static int VerifyMessagesAreNotPreviouslyReceived(IList messageList) { var count = 0; if (messageList == null) @@ -461,20 +531,12 @@ static int ValidateMessages(IList messageList) foreach (var message in messageList) { count++; - ValidateMessage(message); + message.VerifyMessageIsNotPreviouslyReceived(); } return count; } - static void ValidateMessage(Message message) - { - if (message.SystemProperties.IsLockTokenSet) - { - throw Fx.Exception.Argument(nameof(message), "Cannot send a message that was already received."); - } - } - static void CloseSession(SendingAmqpLink link) { // Note we close the session (which includes the link). @@ -526,10 +588,10 @@ async Task> ProcessMessages(IList messageList) return processedMessageList; } - async Task OnSendAsync(IList messageList) + async Task OnSendAsync(Func amqpMessageProvider) { var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true); - using (var amqpMessage = AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(messageList)) + using (var amqpMessage = amqpMessageProvider()) { SendingAmqpLink amqpLink = null; try diff --git a/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs b/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs index a7547c26..3d38191c 100644 --- a/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs +++ b/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs @@ -6,6 +6,7 @@ namespace Microsoft.Azure.ServiceBus.Diagnostics using System; using System.Collections.Generic; using System.Diagnostics; + using Microsoft.Azure.ServiceBus.Primitives; public static class MessageExtensions { @@ -15,20 +16,20 @@ public static class MessageExtensions /// New with tracing context /// /// - /// Tracing context is used to correlate telemetry between producer and consumer and + /// Tracing context is used to correlate telemetry between producer and consumer and /// represented by 'Diagnostic-Id' and 'Correlation-Context' properties in . - /// + /// /// .NET SDK automatically injects context when sending message to the ServiceBus (if diagnostics is enabled by tracing system). - /// + /// /// /// 'Diagnostic-Id' uniquely identifies operation that enqueued message /// /// /// 'Correlation-Context' is comma separated list of sting key value pairs represeting optional context for the operation. /// - /// + /// /// If there is no tracing context in the message, this method returns without parent. - /// + /// /// Returned needs to be started before it can be used (see example below) /// /// @@ -39,7 +40,7 @@ public static class MessageExtensions /// var activity = message.ExtractActivity(); /// activity.Start(); /// Logger.LogInformation($"Message received, Id = {Activity.Current.Id}") - /// try + /// try /// { /// // process message /// } @@ -47,7 +48,7 @@ public static class MessageExtensions /// { /// Logger.LogError($"Exception {ex}, Id = {Activity.Current.Id}") /// } - /// finally + /// finally /// { /// activity.Stop(); /// // Activity is stopped, we no longer have it in Activity.Current, let's user activity now @@ -55,10 +56,10 @@ public static class MessageExtensions /// } /// } /// - /// - /// Note that every log is stamped with .Id, that could be used within + /// + /// Note that every log is stamped with .Id, that could be used within /// any nested method call (sync or async) - is an ambient context that flows with async method calls. - /// + /// /// public static Activity ExtractActivity(this Message message, string activityName = null) diff --git a/src/Microsoft.Azure.ServiceBus/Extensions/MessageExtensions.cs b/src/Microsoft.Azure.ServiceBus/Extensions/MessageExtensions.cs new file mode 100644 index 00000000..74ca654a --- /dev/null +++ b/src/Microsoft.Azure.ServiceBus/Extensions/MessageExtensions.cs @@ -0,0 +1,15 @@ +namespace Microsoft.Azure.ServiceBus +{ + using Microsoft.Azure.ServiceBus.Primitives; + + internal static class MessageExtensions + { + public static void VerifyMessageIsNotPreviouslyReceived(this Message message) + { + if (message.SystemProperties.IsLockTokenSet) + { + throw Fx.Exception.Argument(nameof(message), "Cannot send a message that was already received."); + } + } + } +} \ No newline at end of file diff --git a/src/Microsoft.Azure.ServiceBus/QueueClient.cs b/src/Microsoft.Azure.ServiceBus/QueueClient.cs index 145f5851..9177a724 100644 --- a/src/Microsoft.Azure.ServiceBus/QueueClient.cs +++ b/src/Microsoft.Azure.ServiceBus/QueueClient.cs @@ -89,7 +89,7 @@ public QueueClient(string connectionString, string entityPath, ReceiveMode recei { throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString); } - + this.OwnsConnection = true; } @@ -328,7 +328,7 @@ internal SessionPumpHost SessionPumpHost return this.sessionPumpHost; } } - + ICbsTokenProvider CbsTokenProvider { get; } /// @@ -349,6 +349,27 @@ public Task SendAsync(IList messageList) return this.InnerSender.SendAsync(messageList); } + /// + /// Sends a of messages to Service Bus. + /// + public Task SendAsync(MessageBatch messageBatch) + { + this.ThrowIfClosed(); + + return this.innerSender.SendAsync(messageBatch); + } + + /// + /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + /// + public Task CreateBatch() + { + this.ThrowIfClosed(); + + return this.innerSender.CreateBatch(); + } + + /// /// Completes a using its lock token. This will delete the message from the queue. /// diff --git a/src/Microsoft.Azure.ServiceBus/TopicClient.cs b/src/Microsoft.Azure.ServiceBus/TopicClient.cs index fe9ff6be..9aec2a3c 100644 --- a/src/Microsoft.Azure.ServiceBus/TopicClient.cs +++ b/src/Microsoft.Azure.ServiceBus/TopicClient.cs @@ -164,7 +164,7 @@ internal MessageSender InnerSender return this.innerSender; } } - + ICbsTokenProvider CbsTokenProvider { get; } /// @@ -184,6 +184,26 @@ public Task SendAsync(IList messageList) return this.InnerSender.SendAsync(messageList); } + /// + /// Sends a of messages to Service Bus. + /// + public Task SendAsync(MessageBatch messageBatch) + { + this.ThrowIfClosed(); + + return this.innerSender.SendAsync(messageBatch); + } + + /// + /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + /// + public Task CreateBatch() + { + this.ThrowIfClosed(); + + return this.innerSender.CreateBatch(); + } + /// /// Schedules a message to appear on Service Bus at a later time. /// diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index 1e1bae48..a69b1b2a 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -230,6 +230,7 @@ namespace Microsoft.Azure.ServiceBus public System.Threading.Tasks.Task AbandonAsync(string lockToken, System.Collections.Generic.IDictionary propertiesToModify = null) { } public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { } public System.Threading.Tasks.Task CompleteAsync(string lockToken) { } + public System.Threading.Tasks.Task CreateBatch() { } public System.Threading.Tasks.Task DeadLetterAsync(string lockToken, System.Collections.Generic.IDictionary propertiesToModify = null) { } public System.Threading.Tasks.Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null) { } protected override System.Threading.Tasks.Task OnClosingAsync() { } @@ -241,6 +242,7 @@ namespace Microsoft.Azure.ServiceBus public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } + public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Core.MessageBatch messageBatch) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } public sealed class QuotaExceededException : Microsoft.Azure.ServiceBus.ServiceBusException @@ -462,11 +464,13 @@ namespace Microsoft.Azure.ServiceBus public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; } public string TopicName { get; } public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { } + public System.Threading.Tasks.Task CreateBatch() { } protected override System.Threading.Tasks.Task OnClosingAsync() { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } + public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Core.MessageBatch messageBatch) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } public enum TransportType @@ -529,6 +533,13 @@ namespace Microsoft.Azure.ServiceBus.Core System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message); System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList); } + [System.Diagnostics.DebuggerDisplayAttribute("{DebuggerDisplay,nq}")] + public class MessageBatch : System.IDisposable + { + public int Length { get; } + public void Dispose() { } + public System.Threading.Tasks.Task TryAdd(Microsoft.Azure.ServiceBus.Message message) { } + } public class MessageReceiver : Microsoft.Azure.ServiceBus.ClientEntity, Microsoft.Azure.ServiceBus.Core.IMessageReceiver, Microsoft.Azure.ServiceBus.Core.IReceiverClient, Microsoft.Azure.ServiceBus.IClientEntity { public MessageReceiver(Microsoft.Azure.ServiceBus.ServiceBusConnectionStringBuilder connectionStringBuilder, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null, int prefetchCount = 0) { } @@ -588,11 +599,13 @@ namespace Microsoft.Azure.ServiceBus.Core public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; } public string TransferDestinationPath { get; } public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { } + public System.Threading.Tasks.Task CreateBatch() { } protected override System.Threading.Tasks.Task OnClosingAsync() { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } + public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Core.MessageBatch messageBatch) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } public abstract class ServiceBusPlugin diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/MessageBatchTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/MessageBatchTests.cs new file mode 100644 index 00000000..ef21897c --- /dev/null +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/MessageBatchTests.cs @@ -0,0 +1,88 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Threading.Tasks; + +namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives +{ + using System; + using System.Text; + using Microsoft.Azure.ServiceBus.Core; + using Xunit; + + public class MessageBatchTests + { + private readonly Func> fakePluginsCallback = Task.FromResult; + + [Fact] + public async Task Should_return_false_when_is_about_to_exceed_max_batch_size() + { + using (var batch = new MessageBatch(1, fakePluginsCallback)) + { + var wasAdded = await batch.TryAdd(new Message(Encoding.UTF8.GetBytes("hello"))); + Assert.False(wasAdded, "Message should not have been added, but it was."); + } + } + + [Fact] + public async Task Should_throw_if_batch_disposed() + { + using (var batch = new MessageBatch(1, fakePluginsCallback)) + { + batch.Dispose(); + await Assert.ThrowsAsync(() => batch.TryAdd(new Message())); + } + } + + [Fact] + public async Task Should_throw_when_trying_to_add_an_already_received_message_to_batch() + { + using (var batch = new MessageBatch(100, fakePluginsCallback)) + { + var message = new Message("test".GetBytes()); + message.SystemProperties.LockTokenGuid = Guid.NewGuid(); + + await Assert.ThrowsAsync(() => batch.TryAdd(message)); + } + } + + [Theory] + [InlineData(1)] + [InlineData(3)] + public async Task Should_report_how_many_messages_are_in_batch(int numberOfMessages) + { + using (var batch = new MessageBatch(100, fakePluginsCallback)) + { + for (var i = 0; i < numberOfMessages; i++) + { + await batch.TryAdd(new Message()); + } + + Assert.Equal(numberOfMessages, batch.Length); + } + } + + [Fact] + public async Task Should_reflect_property_in_batch_size() + { + using (var batch = new MessageBatch(100, fakePluginsCallback)) + { + var message = new Message(); + + await batch.TryAdd(message); + + Assert.Equal((ulong)24, batch.Size); + } + + using (var batch = new MessageBatch(100, fakePluginsCallback)) + { + var message = new Message(); + message.UserProperties["custom"] = "value"; + + await batch.TryAdd(message); + + Assert.Equal((ulong)45, batch.Size); + } + } + } +} \ No newline at end of file