From 4b7aecc085b46e42c1b6fb6b638f23b906e8b51f Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Sun, 29 Jul 2018 01:05:41 -0600 Subject: [PATCH 01/22] Initial (simple) batching implementation --- .../Amqp/AmqpMessageConverter.cs | 9 +- src/Microsoft.Azure.ServiceBus/Core/Batch.cs | 121 ++++++++++++++++++ .../Core/MessageSender.cs | 41 +++++- .../Primitives/BatchTests.cs | 33 +++++ .../SenderReceiverTests.cs | 4 +- 5 files changed, 200 insertions(+), 8 deletions(-) create mode 100644 src/Microsoft.Azure.ServiceBus/Core/Batch.cs create mode 100644 test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs diff --git a/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs b/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs index 5d23960a..125371cc 100644 --- a/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs +++ b/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs @@ -1,6 +1,9 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Diagnostics; +using System.Linq; + namespace Microsoft.Azure.ServiceBus.Amqp { using System; @@ -23,9 +26,9 @@ static class AmqpMessageConverter const string EnqueueSequenceNumberName = "x-opt-enqueue-sequence-number"; const string LockedUntilName = "x-opt-locked-until"; const string PublisherName = "x-opt-publisher"; - const string PartitionKeyName = "x-opt-partition-key"; const string PartitionIdName = "x-opt-partition-id"; - const string ViaPartitionKeyName = "x-opt-via-partition-key"; + internal const string PartitionKeyName = "x-opt-partition-key"; + internal const string ViaPartitionKeyName = "x-opt-via-partition-key"; const string DeadLetterSourceName = "x-opt-deadletter-source"; const string TimeSpanName = AmqpConstants.Vendor + ":timespan"; const string UriName = AmqpConstants.Vendor + ":uri"; @@ -641,7 +644,7 @@ static ArraySegment StreamToBytes(Stream stream) return buffer; } - private static Data ToData(AmqpMessage message) + internal static Data ToData(AmqpMessage message) { ArraySegment[] payload = message.GetPayload(); var buffer = new BufferListStream(payload); diff --git a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs new file mode 100644 index 00000000..c13817fb --- /dev/null +++ b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs @@ -0,0 +1,121 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using Microsoft.Azure.Amqp; +using Microsoft.Azure.Amqp.Framing; +using Microsoft.Azure.ServiceBus.Amqp; + +namespace Microsoft.Azure.ServiceBus.Core +{ + [DebuggerDisplay("{" + nameof(DebuggerDisplay) + ",nq}")] + public class Batch : IDisposable + { + private readonly long maximumBatchSize; + private AmqpMessage firstMessage; + private readonly List datas; + private AmqpMessage result; + private (string messageId, string sessionId, string partitionKey, string viaPartitionKey) originalMessageData; + + /// + /// Construct a new batch with a maximum batch size. + /// + /// Maximum batch size allowed for batch. + public Batch(long maximumBatchSize) + { + this.maximumBatchSize = maximumBatchSize; + this.datas = new List(); + this.result = AmqpMessage.Create(datas); + } + + /// + /// Add to the batch if the overall size of the batch with the added message is not exceeding the batch maximum. + /// + /// to add to the batch. + /// + public bool TryAdd(Message message) + { + ThrowIfDisposed(); + + var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message); + + if (firstMessage == null) + { + originalMessageData = (message.MessageId, message.SessionId, message.PartitionKey, message.ViaPartitionKey); + firstMessage = amqpMessage; + } + + var data = AmqpMessageConverter.ToData(amqpMessage); + datas.Add(data); + + if (Size <= maximumBatchSize) + { + return true; + } + + datas.Remove(data); + return false; + + } + + private long Size => result.SerializedMessageSize; + + /// + /// Convert batch to AMQP message + /// + /// + public AmqpMessage ToAmqpMessage() + { + ThrowIfDisposed(); + + if (datas.Count == 1) + { + firstMessage.Batchable = true; + return firstMessage; + } + + if (originalMessageData.messageId != null) + { + result.Properties.MessageId = originalMessageData.messageId; + } + + if (originalMessageData.sessionId != null) + { + result.Properties.GroupId = originalMessageData.sessionId; + } + + if (originalMessageData.partitionKey != null) + { + result.MessageAnnotations.Map[AmqpMessageConverter.PartitionKeyName] = originalMessageData.partitionKey; + } + + if (originalMessageData.viaPartitionKey != null) + { + result.MessageAnnotations.Map[AmqpMessageConverter.ViaPartitionKeyName] = originalMessageData.viaPartitionKey; + } + + result.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat; + result.Batchable = true; + return result; + } + + public void Dispose() + { + // TODO: review if there's anything else to do + firstMessage?.Dispose(); + result?.Dispose(); + + firstMessage = null; + result = null; + } + + private void ThrowIfDisposed() + { + if (result == null) + { + throw new Exception("Batch is has been disposed and cannot be re-used."); + } + } + + private string DebuggerDisplay => $"Batch: size={Size} message count={datas.Count}"; + } +} \ No newline at end of file diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index 64dcf7a0..8d86a366 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -252,7 +252,7 @@ public async Task SendAsync(IList messageList) { var processedMessages = await this.ProcessMessages(messageList).ConfigureAwait(false); - sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(processedMessages), this.OperationTimeout); + sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(() => AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(processedMessages)), this.OperationTimeout); await sendTask.ConfigureAwait(false); } catch (Exception exception) @@ -272,6 +272,41 @@ public async Task SendAsync(IList messageList) MessagingEventSource.Log.MessageSendStop(this.ClientId); } + public async Task SendAsync(Batch batch) + { + this.ThrowIfClosed(); + + //var count = MessageSender.ValidateMessages(messageList); +// MessagingEventSource.Log.MessageSendStart(this.ClientId, count); + +// var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); +// var activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(messageList) : null; + Task sendTask = null; + + try + { + //var processedMessages = await this.ProcessMessages(messageList).ConfigureAwait(false); + + sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(batch.ToAmqpMessage), this.OperationTimeout); + await sendTask.ConfigureAwait(false); + } + catch (Exception exception) + { +// if (isDiagnosticSourceEnabled) +// { +// this.diagnosticSource.ReportException(exception); +// } + + MessagingEventSource.Log.MessageSendException(this.ClientId, exception); + throw; + } + finally + { +// this.diagnosticSource.SendStop(activity, messageList, sendTask?.Status); + } + +// MessagingEventSource.Log.MessageSendStop(this.ClientId); + } /// /// Schedules a message to appear on Service Bus at a later time. @@ -526,10 +561,10 @@ async Task> ProcessMessages(IList messageList) return processedMessageList; } - async Task OnSendAsync(IList messageList) + async Task OnSendAsync(Func amqpMessageProvider) { var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true); - using (var amqpMessage = AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(messageList)) + using (var amqpMessage = amqpMessageProvider()) { SendingAmqpLink amqpLink = null; try diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs new file mode 100644 index 00000000..f77e9b54 --- /dev/null +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs @@ -0,0 +1,33 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives +{ + using System; + using System.Text; + using Microsoft.Azure.ServiceBus.Core; + using Xunit; + + public class BatchTests + { + [Fact] + public void Should_return_false_when_is_about_to_exceed_max_batch_size() + { + using (var batch = new Batch(1)) + { + var wasAdded = batch.TryAdd(new Message(Encoding.UTF8.GetBytes("hello"))); + Assert.False(wasAdded, "Message should not have been added, but it was."); + } + } + + [Fact] + public void Should_throw_if_batch_disposed() + { + using (var batch = new Batch(1)) + { + batch.Dispose(); + Assert.Throws(() => batch.TryAdd(new Message())); + } + } + } +} \ No newline at end of file diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs index 4ce85af4..ee4746dc 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs @@ -13,9 +13,9 @@ namespace Microsoft.Azure.ServiceBus.UnitTests public class SenderReceiverTests : SenderReceiverClientTestBase { - private static TimeSpan TwoSeconds = TimeSpan.FromSeconds(2); + private static readonly TimeSpan TwoSeconds = TimeSpan.FromSeconds(2); - public static IEnumerable TestPermutations => new object[][] + public static IEnumerable TestPermutations => new[] { new object[] {TestConstants.NonPartitionedQueueName}, new object[] {TestConstants.PartitionedQueueName} From 56c346ec2db86d76aef4bc091b0befc23dde1f39 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Sun, 29 Jul 2018 01:38:30 -0600 Subject: [PATCH 02/22] Updating public API --- .../API/ApiApprovals.ApproveAzureServiceBus.approved.txt | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index 1e1bae48..c19be268 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -492,6 +492,14 @@ namespace Microsoft.Azure.ServiceBus } namespace Microsoft.Azure.ServiceBus.Core { + [System.Diagnostics.DebuggerDisplayAttribute("{DebuggerDisplay,nq}")] + public class Batch : System.IDisposable + { + public Batch(long maximumBatchSize) { } + public void Dispose() { } + public Microsoft.Azure.Amqp.AmqpMessage ToAmqpMessage() { } + public bool TryAdd(Microsoft.Azure.ServiceBus.Message message) { } + } public interface IMessageReceiver : Microsoft.Azure.ServiceBus.Core.IReceiverClient, Microsoft.Azure.ServiceBus.IClientEntity { long LastPeekedSequenceNumber { get; } @@ -593,6 +601,7 @@ namespace Microsoft.Azure.ServiceBus.Core public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } + public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Core.Batch batch) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } public abstract class ServiceBusPlugin From 19e50dfaa0fa91c54bfbe93c1609bfa5a4e2f1fa Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Sun, 29 Jul 2018 22:53:02 -0600 Subject: [PATCH 03/22] Modify test to remove reliance on order of messages --- .../SenderReceiverTests.cs | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs index ee4746dc..1fca4264 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Linq; + namespace Microsoft.Azure.ServiceBus.UnitTests { using System; @@ -470,9 +472,25 @@ async Task MessageSenderShouldNotThrowWhenSendingEmptyCollection(string queueNam try { - await sender.SendAsync(new List()); - var message = await receiver.ReceiveAsync(TimeSpan.FromSeconds(3)); - Assert.True(message == null, "Expected not to find any messages, but a message was received."); + var message1 = new Message(Encoding.UTF8.GetBytes("Hello Neeraj")); + var message2 = new Message(Encoding.UTF8.GetBytes("from")); + var message3 = new Message(Encoding.UTF8.GetBytes("Sean Feldman")); + + var batch = new Batch(100); + Assert.True(batch.TryAdd(message1), "Couldn't add first message"); + Assert.True(batch.TryAdd(message2), "Couldn't add second message"); + Assert.False(batch.TryAdd(message3), "Shouldn't be able to add third message"); + await sender.SendAsync(batch); + batch.Dispose(); + await sender.CloseAsync(); + + var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 2); + var bodies = receivedMessages.Select(m => Encoding.UTF8.GetString(m.Body)); + Assert.Collection(bodies, item => Assert.Contains("Hello Neeraj", item), + item => Assert.Contains("from", item)); + + var extraMessage = await TestUtility.PeekMessageAsync(receiver); + Assert.True(extraMessage == null, "Should not have any messages other than the two, but an extra message is found"); } finally { From b03695e682444c66faf184f32eea336264e0584c Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Sun, 29 Jul 2018 23:23:10 -0600 Subject: [PATCH 04/22] Adding batched send to QueueClient and TopicClient --- .../Core/ISenderClient.cs | 6 ++++++ .../Core/MessageSender.cs | 4 ++++ src/Microsoft.Azure.ServiceBus/QueueClient.cs | 14 ++++++++++++-- src/Microsoft.Azure.ServiceBus/TopicClient.cs | 12 +++++++++++- ...piApprovals.ApproveAzureServiceBus.approved.txt | 2 ++ 5 files changed, 35 insertions(+), 3 deletions(-) diff --git a/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs b/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs index aed01c9f..c66a5e94 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs @@ -25,6 +25,12 @@ public interface ISenderClient : IClientEntity /// Task SendAsync(IList messageList); +// TODO: extract method into this interface for the next major version +// /// +// /// Sends a of messages to Service Bus. +// /// +// Task SendAsync(Batch batch); + /// /// Schedules a message to appear on Service Bus. /// diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index 8d86a366..2ae42e74 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -272,6 +272,10 @@ public async Task SendAsync(IList messageList) MessagingEventSource.Log.MessageSendStop(this.ClientId); } + + /// + /// Sends a of messages to Service Bus. + /// public async Task SendAsync(Batch batch) { this.ThrowIfClosed(); diff --git a/src/Microsoft.Azure.ServiceBus/QueueClient.cs b/src/Microsoft.Azure.ServiceBus/QueueClient.cs index 145f5851..79b57124 100644 --- a/src/Microsoft.Azure.ServiceBus/QueueClient.cs +++ b/src/Microsoft.Azure.ServiceBus/QueueClient.cs @@ -89,7 +89,7 @@ public QueueClient(string connectionString, string entityPath, ReceiveMode recei { throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString); } - + this.OwnsConnection = true; } @@ -328,7 +328,7 @@ internal SessionPumpHost SessionPumpHost return this.sessionPumpHost; } } - + ICbsTokenProvider CbsTokenProvider { get; } /// @@ -349,6 +349,16 @@ public Task SendAsync(IList messageList) return this.InnerSender.SendAsync(messageList); } + /// + /// Sends a of messages to Service Bus. + /// + public Task SendAsync(Batch batch) + { + this.ThrowIfClosed(); + + return this.innerSender.SendAsync(batch); + } + /// /// Completes a using its lock token. This will delete the message from the queue. /// diff --git a/src/Microsoft.Azure.ServiceBus/TopicClient.cs b/src/Microsoft.Azure.ServiceBus/TopicClient.cs index fe9ff6be..6a1d4b18 100644 --- a/src/Microsoft.Azure.ServiceBus/TopicClient.cs +++ b/src/Microsoft.Azure.ServiceBus/TopicClient.cs @@ -164,7 +164,7 @@ internal MessageSender InnerSender return this.innerSender; } } - + ICbsTokenProvider CbsTokenProvider { get; } /// @@ -184,6 +184,16 @@ public Task SendAsync(IList messageList) return this.InnerSender.SendAsync(messageList); } + /// + /// Sends a of messages to Service Bus. + /// + public Task SendAsync(Batch batch) + { + this.ThrowIfClosed(); + + return this.innerSender.SendAsync(batch); + } + /// /// Schedules a message to appear on Service Bus at a later time. /// diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index c19be268..fc63d05c 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -241,6 +241,7 @@ namespace Microsoft.Azure.ServiceBus public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } + public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Core.Batch batch) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } public sealed class QuotaExceededException : Microsoft.Azure.ServiceBus.ServiceBusException @@ -467,6 +468,7 @@ namespace Microsoft.Azure.ServiceBus public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } + public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Core.Batch batch) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } public enum TransportType From 76d127a285dcb9cb2fe896390eb29b132bbf19a7 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Tue, 31 Jul 2018 00:56:26 -0600 Subject: [PATCH 05/22] Remove order implied by Assert.Collection() Adding extra info to identify why the test is failing on full framework --- .../SenderReceiverTests.cs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs index 1fca4264..925af95f 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs @@ -1,11 +1,10 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using System.Linq; - namespace Microsoft.Azure.ServiceBus.UnitTests { using System; + using System.Linq; using System.Text; using System.Collections.Generic; using System.Threading; @@ -472,9 +471,9 @@ async Task MessageSenderShouldNotThrowWhenSendingEmptyCollection(string queueNam try { - var message1 = new Message(Encoding.UTF8.GetBytes("Hello Neeraj")); - var message2 = new Message(Encoding.UTF8.GetBytes("from")); - var message3 = new Message(Encoding.UTF8.GetBytes("Sean Feldman")); + var message1 = new Message("Hello Neeraj".GetBytes()); + var message2 = new Message("from".GetBytes()); + var message3 = new Message("Sean Feldman".GetBytes()); var batch = new Batch(100); Assert.True(batch.TryAdd(message1), "Couldn't add first message"); @@ -485,12 +484,12 @@ async Task MessageSenderShouldNotThrowWhenSendingEmptyCollection(string queueNam await sender.CloseAsync(); var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 2); - var bodies = receivedMessages.Select(m => Encoding.UTF8.GetString(m.Body)); - Assert.Collection(bodies, item => Assert.Contains("Hello Neeraj", item), - item => Assert.Contains("from", item)); + var bodies = receivedMessages.Select(m => m.Body.GetString()); + var bodiesArray = bodies as string[] ?? bodies.ToArray(); + Assert.True(bodiesArray.Contains("Hello Neeraj") && bodiesArray.Contains("from")); var extraMessage = await TestUtility.PeekMessageAsync(receiver); - Assert.True(extraMessage == null, "Should not have any messages other than the two, but an extra message is found"); + Assert.True(extraMessage == null, $"Should not have any messages other than the two, but an extra message is found. Body='{extraMessage?.Body.GetString()}'"); } finally { From 369a78f51a4fae5f760f11c84e182e333947b66b Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Tue, 31 Jul 2018 00:07:02 -0600 Subject: [PATCH 06/22] Validate received messages cannot be added to Batch implemented --- src/Microsoft.Azure.ServiceBus/Core/Batch.cs | 3 ++ .../Core/MessageSender.cs | 25 +++++----------- .../MessageDiagnosticsExtensions.cs | 30 ++++++++++++------- .../Primitives/BatchTests.cs | 12 ++++++++ 4 files changed, 42 insertions(+), 28 deletions(-) diff --git a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs index c13817fb..137febc5 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs @@ -4,6 +4,7 @@ using Microsoft.Azure.Amqp; using Microsoft.Azure.Amqp.Framing; using Microsoft.Azure.ServiceBus.Amqp; +using Microsoft.Azure.ServiceBus.Diagnostics; namespace Microsoft.Azure.ServiceBus.Core { @@ -36,6 +37,8 @@ public bool TryAdd(Message message) { ThrowIfDisposed(); + message.VerifyMessageIsNotPreviouslyReceived(); + var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message); if (firstMessage == null) diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index 2ae42e74..f8ea3fd8 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using Microsoft.Azure.ServiceBus.Diagnostics; + namespace Microsoft.Azure.ServiceBus.Core { using System; @@ -16,6 +18,7 @@ namespace Microsoft.Azure.ServiceBus.Core using Microsoft.Azure.ServiceBus.Amqp; using Microsoft.Azure.ServiceBus.Primitives; + /// /// The MessageSender can be used to send messages to Queues or Topics. /// @@ -236,12 +239,7 @@ public async Task SendAsync(IList messageList) { this.ThrowIfClosed(); - var count = MessageSender.ValidateMessages(messageList); - if (count <= 0) - { - return; - } - + var count = MessageSender.VerifyMessagesAreNotPreviouslyReceived(messageList); MessagingEventSource.Log.MessageSendStart(this.ClientId, count); bool isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); @@ -280,7 +278,6 @@ public async Task SendAsync(Batch batch) { this.ThrowIfClosed(); - //var count = MessageSender.ValidateMessages(messageList); // MessagingEventSource.Log.MessageSendStart(this.ClientId, count); // var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); @@ -340,7 +337,7 @@ public async Task ScheduleMessageAsync(Message message, DateTimeOffset sch } message.ScheduledEnqueueTimeUtc = scheduleEnqueueTimeUtc.UtcDateTime; - MessageSender.ValidateMessage(message); + message.VerifyMessageIsNotPreviouslyReceived(); MessagingEventSource.Log.ScheduleMessageStart(this.ClientId, scheduleEnqueueTimeUtc); long result = 0; @@ -489,7 +486,7 @@ protected override async Task OnClosingAsync() await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false); } - static int ValidateMessages(IList messageList) + static int VerifyMessagesAreNotPreviouslyReceived(IList messageList) { var count = 0; if (messageList == null) @@ -500,20 +497,12 @@ static int ValidateMessages(IList messageList) foreach (var message in messageList) { count++; - ValidateMessage(message); + message.VerifyMessageIsNotPreviouslyReceived(); } return count; } - static void ValidateMessage(Message message) - { - if (message.SystemProperties.IsLockTokenSet) - { - throw Fx.Exception.Argument(nameof(message), "Cannot send a message that was already received."); - } - } - static void CloseSession(SendingAmqpLink link) { // Note we close the session (which includes the link). diff --git a/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs b/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs index a7547c26..e2d6cead 100644 --- a/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs +++ b/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs @@ -6,6 +6,7 @@ namespace Microsoft.Azure.ServiceBus.Diagnostics using System; using System.Collections.Generic; using System.Diagnostics; + using Microsoft.Azure.ServiceBus.Primitives; public static class MessageExtensions { @@ -15,20 +16,20 @@ public static class MessageExtensions /// New with tracing context /// /// - /// Tracing context is used to correlate telemetry between producer and consumer and + /// Tracing context is used to correlate telemetry between producer and consumer and /// represented by 'Diagnostic-Id' and 'Correlation-Context' properties in . - /// + /// /// .NET SDK automatically injects context when sending message to the ServiceBus (if diagnostics is enabled by tracing system). - /// + /// /// /// 'Diagnostic-Id' uniquely identifies operation that enqueued message /// /// /// 'Correlation-Context' is comma separated list of sting key value pairs represeting optional context for the operation. /// - /// + /// /// If there is no tracing context in the message, this method returns without parent. - /// + /// /// Returned needs to be started before it can be used (see example below) /// /// @@ -39,7 +40,7 @@ public static class MessageExtensions /// var activity = message.ExtractActivity(); /// activity.Start(); /// Logger.LogInformation($"Message received, Id = {Activity.Current.Id}") - /// try + /// try /// { /// // process message /// } @@ -47,7 +48,7 @@ public static class MessageExtensions /// { /// Logger.LogError($"Exception {ex}, Id = {Activity.Current.Id}") /// } - /// finally + /// finally /// { /// activity.Stop(); /// // Activity is stopped, we no longer have it in Activity.Current, let's user activity now @@ -55,10 +56,10 @@ public static class MessageExtensions /// } /// } /// - /// - /// Note that every log is stamped with .Id, that could be used within + /// + /// Note that every log is stamped with .Id, that could be used within /// any nested method call (sync or async) - is an ambient context that flows with async method calls. - /// + /// /// public static Activity ExtractActivity(this Message message, string activityName = null) @@ -149,5 +150,14 @@ internal static bool TryExtractContext(this Message message, out IList(() => batch.TryAdd(new Message())); } } + + [Fact] + public void Should_throw_when_trying_add_received_message_to_batch() + { + using (var batch = new Batch(100)) + { + var message = new Message("test".GetBytes()); + message.SystemProperties.LockTokenGuid = Guid.NewGuid(); + + Assert.Throws(() => batch.TryAdd(message)); + } + } } } \ No newline at end of file From 04f14899bb264dca180dcbfc9329406a4acb9c7a Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Tue, 31 Jul 2018 00:26:23 -0600 Subject: [PATCH 07/22] Cleanup --- .../Amqp/AmqpMessageConverter.cs | 3 --- src/Microsoft.Azure.ServiceBus/Core/Batch.cs | 17 ++++++++++------- .../Core/MessageSender.cs | 3 +-- .../Primitives/BatchTests.cs | 2 +- 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs b/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs index 125371cc..6909237c 100644 --- a/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs +++ b/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs @@ -1,9 +1,6 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using System.Diagnostics; -using System.Linq; - namespace Microsoft.Azure.ServiceBus.Amqp { using System; diff --git a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs index 137febc5..205817fd 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs @@ -1,13 +1,16 @@ -using System; -using System.Collections.Generic; -using System.Diagnostics; -using Microsoft.Azure.Amqp; -using Microsoft.Azure.Amqp.Framing; -using Microsoft.Azure.ServiceBus.Amqp; -using Microsoft.Azure.ServiceBus.Diagnostics; +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. namespace Microsoft.Azure.ServiceBus.Core { + using System; + using System.Collections.Generic; + using System.Diagnostics; + using Microsoft.Azure.Amqp; + using Microsoft.Azure.Amqp.Framing; + using Microsoft.Azure.ServiceBus.Amqp; + using Microsoft.Azure.ServiceBus.Diagnostics; + [DebuggerDisplay("{" + nameof(DebuggerDisplay) + ",nq}")] public class Batch : IDisposable { diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index f8ea3fd8..a98a1501 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -1,8 +1,6 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using Microsoft.Azure.ServiceBus.Diagnostics; - namespace Microsoft.Azure.ServiceBus.Core { using System; @@ -17,6 +15,7 @@ namespace Microsoft.Azure.ServiceBus.Core using Microsoft.Azure.Amqp.Framing; using Microsoft.Azure.ServiceBus.Amqp; using Microsoft.Azure.ServiceBus.Primitives; + using Microsoft.Azure.ServiceBus.Diagnostics; /// diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs index 65a3db28..a17baf9d 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs @@ -31,7 +31,7 @@ public void Should_throw_if_batch_disposed() } [Fact] - public void Should_throw_when_trying_add_received_message_to_batch() + public void Should_throw_when_trying_to_add_an_already_received_message_to_batch() { using (var batch = new Batch(100)) { From 156dce42711dd6268bae88ec6f0e11fafdeb13ea Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Tue, 31 Jul 2018 16:26:14 -0600 Subject: [PATCH 08/22] Enable MessagingEventSource logging to comply with the rest of SendAsync methods --- src/Microsoft.Azure.ServiceBus/Core/Batch.cs | 8 +++++++- .../Core/MessageSender.cs | 2 +- ...Approvals.ApproveAzureServiceBus.approved.txt | 1 + .../Primitives/BatchTests.cs | 16 ++++++++++++++++ 4 files changed, 25 insertions(+), 2 deletions(-) diff --git a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs index 205817fd..bfdf6582 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs @@ -63,10 +63,16 @@ public bool TryAdd(Message message) } + /// + /// Number of messages in batch. + /// + public int Length => datas.Count; + private long Size => result.SerializedMessageSize; + /// - /// Convert batch to AMQP message + /// Convert batch to AMQP message. /// /// public AmqpMessage ToAmqpMessage() diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index a98a1501..66a3334a 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -277,7 +277,7 @@ public async Task SendAsync(Batch batch) { this.ThrowIfClosed(); -// MessagingEventSource.Log.MessageSendStart(this.ClientId, count); + MessagingEventSource.Log.MessageSendStart(this.ClientId, batch.Length); // var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); // var activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(messageList) : null; diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index fc63d05c..38a1f67c 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -498,6 +498,7 @@ namespace Microsoft.Azure.ServiceBus.Core public class Batch : System.IDisposable { public Batch(long maximumBatchSize) { } + public int Length { get; } public void Dispose() { } public Microsoft.Azure.Amqp.AmqpMessage ToAmqpMessage() { } public bool TryAdd(Microsoft.Azure.ServiceBus.Message message) { } diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs index a17baf9d..3763baf0 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs @@ -41,5 +41,21 @@ public void Should_throw_when_trying_to_add_an_already_received_message_to_batch Assert.Throws(() => batch.TryAdd(message)); } } + + [Theory] + [InlineData(1)] + [InlineData(3)] + public void Should_report_how_many_messages_are_in_batch(int numberOfMessages) + { + using (var batch = new Batch(100)) + { + for (var i = 0; i < numberOfMessages; i++) + { + batch.TryAdd(new Message()); + } + + Assert.Equal(numberOfMessages, batch.Length); + } + } } } \ No newline at end of file From 51da1aedfd28d9c789a5ac8b015432f615a3395d Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Wed, 1 Aug 2018 22:44:26 -0600 Subject: [PATCH 09/22] Verify custom properties affect Batch size (serialized bytes) --- src/Microsoft.Azure.ServiceBus/Core/Batch.cs | 2 +- .../Primitives/BatchTests.cs | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs index bfdf6582..97c41558 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs @@ -68,7 +68,7 @@ public bool TryAdd(Message message) /// public int Length => datas.Count; - private long Size => result.SerializedMessageSize; + internal long Size => result.SerializedMessageSize; /// diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs index 3763baf0..034512e7 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs @@ -57,5 +57,28 @@ public void Should_report_how_many_messages_are_in_batch(int numberOfMessages) Assert.Equal(numberOfMessages, batch.Length); } } + + [Fact] + public void Should_show_reflect_property_in_batch_size() + { + using (var batch = new Batch(100)) + { + var message = new Message(); + + batch.TryAdd(message); + + Assert.Equal(24, batch.Size); + } + + using (var batch = new Batch(100)) + { + var message = new Message(); + message.UserProperties["custom"] = "value"; + + batch.TryAdd(message); + + Assert.Equal(45, batch.Size); + } + } } } \ No newline at end of file From 7a60434033c6400ecb05b89b65b8104c6549466e Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Thu, 2 Aug 2018 00:31:43 -0600 Subject: [PATCH 10/22] Provide an API to create Batch initiated with supported maximum message size --- src/Microsoft.Azure.ServiceBus/Core/Batch.cs | 8 +-- .../Core/ISenderClient.cs | 14 +++-- .../Core/MessageSender.cs | 35 +++++++++++++ src/Microsoft.Azure.ServiceBus/QueueClient.cs | 11 ++++ src/Microsoft.Azure.ServiceBus/TopicClient.cs | 10 ++++ ...rovals.ApproveAzureServiceBus.approved.txt | 5 +- .../Primitives/BatchTests.cs | 4 +- .../SenderReceiverTests.cs | 52 +++++++++++++++++-- 8 files changed, 124 insertions(+), 15 deletions(-) diff --git a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs index 97c41558..296b7f91 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs @@ -14,7 +14,7 @@ namespace Microsoft.Azure.ServiceBus.Core [DebuggerDisplay("{" + nameof(DebuggerDisplay) + ",nq}")] public class Batch : IDisposable { - private readonly long maximumBatchSize; + internal readonly ulong maximumBatchSize; private AmqpMessage firstMessage; private readonly List datas; private AmqpMessage result; @@ -24,7 +24,7 @@ public class Batch : IDisposable /// Construct a new batch with a maximum batch size. /// /// Maximum batch size allowed for batch. - public Batch(long maximumBatchSize) + public Batch(ulong maximumBatchSize) { this.maximumBatchSize = maximumBatchSize; this.datas = new List(); @@ -68,7 +68,7 @@ public bool TryAdd(Message message) /// public int Length => datas.Count; - internal long Size => result.SerializedMessageSize; + internal ulong Size => (ulong) result.SerializedMessageSize; /// @@ -128,6 +128,6 @@ private void ThrowIfDisposed() } } - private string DebuggerDisplay => $"Batch: size={Size} message count={datas.Count}"; + private string DebuggerDisplay => $"Batch: size={Size} message count={datas.Count} maximum size={maximumBatchSize}"; } } \ No newline at end of file diff --git a/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs b/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs index c66a5e94..69aa5ab2 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs @@ -25,11 +25,15 @@ public interface ISenderClient : IClientEntity /// Task SendAsync(IList messageList); -// TODO: extract method into this interface for the next major version -// /// -// /// Sends a of messages to Service Bus. -// /// -// Task SendAsync(Batch batch); + // TODO: extract methods into this interface for the next major version + // /// + // /// Sends a of messages to Service Bus. + // /// + // Task SendAsync(Batch batch); + // /// + // /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + // /// + // Task CreateBatch(); /// /// Schedules a message to appear on Service Bus. diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index 66a3334a..c609a733 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -43,6 +43,7 @@ public class MessageSender : ClientEntity, IMessageSender readonly ServiceBusDiagnosticSource diagnosticSource; readonly bool isViaSender; readonly string transferDestinationPath; + private ulong maxMessageSize = 0; /// /// Creates a new AMQP MessageSender. @@ -308,6 +309,40 @@ public async Task SendAsync(Batch batch) // MessagingEventSource.Log.MessageSendStop(this.ClientId); } + /// + /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + /// + public async Task CreateBatch() + { + if (maxMessageSize != 0) + { + return new Batch(maxMessageSize); + } + + var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true); + SendingAmqpLink amqpLink = null; + try + { + if (!this.SendLinkManager.TryGetOpenedObject(out amqpLink)) + { + amqpLink = await this.SendLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false); + } + + if (!amqpLink.Settings.MaxMessageSize.HasValue) + { + throw new Exception("Broker didn't provide maximum message size. Batch requires maximum message size to operate."); + } + + maxMessageSize = amqpLink.Settings.MaxMessageSize.Value; + + return new Batch(maxMessageSize); + } + catch (Exception exception) + { + throw AmqpExceptionHelper.GetClientException(exception, amqpLink?.GetTrackingId(), null, amqpLink?.Session.IsClosing() ?? false); + } + } + /// /// Schedules a message to appear on Service Bus at a later time. /// diff --git a/src/Microsoft.Azure.ServiceBus/QueueClient.cs b/src/Microsoft.Azure.ServiceBus/QueueClient.cs index 79b57124..dc9696bd 100644 --- a/src/Microsoft.Azure.ServiceBus/QueueClient.cs +++ b/src/Microsoft.Azure.ServiceBus/QueueClient.cs @@ -359,6 +359,17 @@ public Task SendAsync(Batch batch) return this.innerSender.SendAsync(batch); } + /// + /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + /// + public Task CreateBatch() + { + this.ThrowIfClosed(); + + return this.innerSender.CreateBatch(); + } + + /// /// Completes a using its lock token. This will delete the message from the queue. /// diff --git a/src/Microsoft.Azure.ServiceBus/TopicClient.cs b/src/Microsoft.Azure.ServiceBus/TopicClient.cs index 6a1d4b18..c833add1 100644 --- a/src/Microsoft.Azure.ServiceBus/TopicClient.cs +++ b/src/Microsoft.Azure.ServiceBus/TopicClient.cs @@ -194,6 +194,16 @@ public Task SendAsync(Batch batch) return this.innerSender.SendAsync(batch); } + /// + /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + /// + public Task CreateBatch() + { + this.ThrowIfClosed(); + + return this.innerSender.CreateBatch(); + } + /// /// Schedules a message to appear on Service Bus at a later time. /// diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index 38a1f67c..c936898c 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -230,6 +230,7 @@ namespace Microsoft.Azure.ServiceBus public System.Threading.Tasks.Task AbandonAsync(string lockToken, System.Collections.Generic.IDictionary propertiesToModify = null) { } public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { } public System.Threading.Tasks.Task CompleteAsync(string lockToken) { } + public System.Threading.Tasks.Task CreateBatch() { } public System.Threading.Tasks.Task DeadLetterAsync(string lockToken, System.Collections.Generic.IDictionary propertiesToModify = null) { } public System.Threading.Tasks.Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null) { } protected override System.Threading.Tasks.Task OnClosingAsync() { } @@ -463,6 +464,7 @@ namespace Microsoft.Azure.ServiceBus public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; } public string TopicName { get; } public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { } + public System.Threading.Tasks.Task CreateBatch() { } protected override System.Threading.Tasks.Task OnClosingAsync() { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } @@ -497,7 +499,7 @@ namespace Microsoft.Azure.ServiceBus.Core [System.Diagnostics.DebuggerDisplayAttribute("{DebuggerDisplay,nq}")] public class Batch : System.IDisposable { - public Batch(long maximumBatchSize) { } + public Batch(ulong maximumBatchSize) { } public int Length { get; } public void Dispose() { } public Microsoft.Azure.Amqp.AmqpMessage ToAmqpMessage() { } @@ -599,6 +601,7 @@ namespace Microsoft.Azure.ServiceBus.Core public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; } public string TransferDestinationPath { get; } public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { } + public System.Threading.Tasks.Task CreateBatch() { } protected override System.Threading.Tasks.Task OnClosingAsync() { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs index 034512e7..9ff76ff2 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs @@ -67,7 +67,7 @@ public void Should_show_reflect_property_in_batch_size() batch.TryAdd(message); - Assert.Equal(24, batch.Size); + Assert.Equal((ulong)24, batch.Size); } using (var batch = new Batch(100)) @@ -77,7 +77,7 @@ public void Should_show_reflect_property_in_batch_size() batch.TryAdd(message); - Assert.Equal(45, batch.Size); + Assert.Equal((ulong)45, batch.Size); } } } diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs index 925af95f..8661c90b 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs @@ -464,9 +464,8 @@ public async Task ClientsUseGlobalConnectionCloseFirstClientSecoundClientShouldS [Theory] [InlineData(TestConstants.NonPartitionedQueueName)] [DisplayTestMethodName] - async Task MessageSenderShouldNotThrowWhenSendingEmptyCollection(string queueName) - { - var sender = new MessageSender(TestUtility.NamespaceConnectionString, queueName); + var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName); + var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 1); var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, queueName, ReceiveMode.ReceiveAndDelete); try @@ -498,5 +497,52 @@ async Task MessageSenderShouldNotThrowWhenSendingEmptyCollection(string queueNam } } + [Fact] + [DisplayTestMethodName] + public async Task Sending_batch_with_properties() + { + var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName); + var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName, receiveMode: ReceiveMode.ReceiveAndDelete); + try + { + var message = new Message("Hello Neeraj".GetBytes()); + message.UserProperties["custom"] = "value"; + + var batch = new Batch(100); + Assert.True(batch.TryAdd(message), "Couldn't add message"); + await sender.SendAsync(batch); + batch.Dispose(); + await sender.CloseAsync(); + + var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 1); + var receivedMessage = receivedMessages.FirstOrDefault(); + Assert.NotNull(receivedMessage); + Assert.Equal("value", receivedMessage.UserProperties["custom"]); + } + finally + { + await sender.CloseAsync().ConfigureAwait(false); + await receiver.CloseAsync().ConfigureAwait(false); + } + } + + [Fact] + [DisplayTestMethodName] + public async Task Batch_should_have_maximum_size_set() + { + var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName); + try + { + using (var batch = await sender.CreateBatch()) + { + Assert.True(batch.maximumBatchSize == 256 * 1024 || batch.maximumBatchSize == 1024 * 1024, + $"Maximum batch size was expected to be 256KB or 1MB, but it wasn't. Reported size: {batch.maximumBatchSize}"); + } + } + finally + { + await sender.CloseAsync().ConfigureAwait(false); + } + } } } \ No newline at end of file From 306e57e8f437a491395a5b22d35987b6602e4eb3 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Thu, 2 Aug 2018 01:23:53 -0600 Subject: [PATCH 11/22] Pass messages in Batch through outgoing plugins --- src/Microsoft.Azure.ServiceBus/Core/Batch.cs | 21 +++++-- .../Core/MessageSender.cs | 4 +- ...rovals.ApproveAzureServiceBus.approved.txt | 4 +- .../Primitives/BatchTests.cs | 34 +++++++----- .../SenderReceiverTests.cs | 55 +++++++++++++++++-- 5 files changed, 88 insertions(+), 30 deletions(-) diff --git a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs index 296b7f91..a8835603 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/Batch.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Threading.Tasks; + namespace Microsoft.Azure.ServiceBus.Core { using System; @@ -15,18 +17,25 @@ namespace Microsoft.Azure.ServiceBus.Core public class Batch : IDisposable { internal readonly ulong maximumBatchSize; + private readonly Func> pluginsCallback; private AmqpMessage firstMessage; private readonly List datas; private AmqpMessage result; private (string messageId, string sessionId, string partitionKey, string viaPartitionKey) originalMessageData; /// - /// Construct a new batch with a maximum batch size. + /// Construct a new batch with a maximum batch size and outgoing plugins callback. + /// + /// To construct a batch at run-time, use , , or . + /// Use this constructor for testing and custom implementations. + /// /// /// Maximum batch size allowed for batch. - public Batch(ulong maximumBatchSize) + /// Plugins callback to invoke on outgoing messages regisered with batch. + public Batch(ulong maximumBatchSize, Func> pluginsCallback) { this.maximumBatchSize = maximumBatchSize; + this.pluginsCallback = pluginsCallback; this.datas = new List(); this.result = AmqpMessage.Create(datas); } @@ -36,17 +45,19 @@ public Batch(ulong maximumBatchSize) /// /// to add to the batch. /// - public bool TryAdd(Message message) + public async Task TryAdd(Message message) { ThrowIfDisposed(); message.VerifyMessageIsNotPreviouslyReceived(); - var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message); + var processedMessage = await pluginsCallback(message); + + var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(processedMessage); if (firstMessage == null) { - originalMessageData = (message.MessageId, message.SessionId, message.PartitionKey, message.ViaPartitionKey); + originalMessageData = (processedMessage.MessageId, processedMessage.SessionId, processedMessage.PartitionKey, processedMessage.ViaPartitionKey); firstMessage = amqpMessage; } diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index c609a733..8a0dc9c4 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -316,7 +316,7 @@ public async Task CreateBatch() { if (maxMessageSize != 0) { - return new Batch(maxMessageSize); + return new Batch(maxMessageSize, ProcessMessage); } var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true); @@ -335,7 +335,7 @@ public async Task CreateBatch() maxMessageSize = amqpLink.Settings.MaxMessageSize.Value; - return new Batch(maxMessageSize); + return new Batch(maxMessageSize, ProcessMessage); } catch (Exception exception) { diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index c936898c..dbc33a1b 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -499,11 +499,11 @@ namespace Microsoft.Azure.ServiceBus.Core [System.Diagnostics.DebuggerDisplayAttribute("{DebuggerDisplay,nq}")] public class Batch : System.IDisposable { - public Batch(ulong maximumBatchSize) { } + public Batch(ulong maximumBatchSize, System.Func> pluginsCallback) { } public int Length { get; } public void Dispose() { } public Microsoft.Azure.Amqp.AmqpMessage ToAmqpMessage() { } - public bool TryAdd(Microsoft.Azure.ServiceBus.Message message) { } + public System.Threading.Tasks.Task TryAdd(Microsoft.Azure.ServiceBus.Message message) { } } public interface IMessageReceiver : Microsoft.Azure.ServiceBus.Core.IReceiverClient, Microsoft.Azure.ServiceBus.IClientEntity { diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs index 9ff76ff2..5ff743d9 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Threading.Tasks; + namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives { using System; @@ -10,12 +12,14 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives public class BatchTests { + private Func> fakePluginsCallback = Task.FromResult; + [Fact] - public void Should_return_false_when_is_about_to_exceed_max_batch_size() + public async Task Should_return_false_when_is_about_to_exceed_max_batch_size() { - using (var batch = new Batch(1)) + using (var batch = new Batch(1, fakePluginsCallback)) { - var wasAdded = batch.TryAdd(new Message(Encoding.UTF8.GetBytes("hello"))); + var wasAdded = await batch.TryAdd(new Message(Encoding.UTF8.GetBytes("hello"))); Assert.False(wasAdded, "Message should not have been added, but it was."); } } @@ -23,35 +27,35 @@ public void Should_return_false_when_is_about_to_exceed_max_batch_size() [Fact] public void Should_throw_if_batch_disposed() { - using (var batch = new Batch(1)) + using (var batch = new Batch(1, fakePluginsCallback)) { batch.Dispose(); - Assert.Throws(() => batch.TryAdd(new Message())); + Assert.ThrowsAsync(() => batch.TryAdd(new Message())); } } [Fact] public void Should_throw_when_trying_to_add_an_already_received_message_to_batch() { - using (var batch = new Batch(100)) + using (var batch = new Batch(100, fakePluginsCallback)) { var message = new Message("test".GetBytes()); message.SystemProperties.LockTokenGuid = Guid.NewGuid(); - Assert.Throws(() => batch.TryAdd(message)); + Assert.ThrowsAsync(() => batch.TryAdd(message)); } } [Theory] [InlineData(1)] [InlineData(3)] - public void Should_report_how_many_messages_are_in_batch(int numberOfMessages) + public async Task Should_report_how_many_messages_are_in_batch(int numberOfMessages) { - using (var batch = new Batch(100)) + using (var batch = new Batch(100, fakePluginsCallback)) { for (var i = 0; i < numberOfMessages; i++) { - batch.TryAdd(new Message()); + await batch.TryAdd(new Message()); } Assert.Equal(numberOfMessages, batch.Length); @@ -59,23 +63,23 @@ public void Should_report_how_many_messages_are_in_batch(int numberOfMessages) } [Fact] - public void Should_show_reflect_property_in_batch_size() + public async Task Should_reflect_property_in_batch_size() { - using (var batch = new Batch(100)) + using (var batch = new Batch(100, fakePluginsCallback)) { var message = new Message(); - batch.TryAdd(message); + await batch.TryAdd(message); Assert.Equal((ulong)24, batch.Size); } - using (var batch = new Batch(100)) + using (var batch = new Batch(100, fakePluginsCallback)) { var message = new Message(); message.UserProperties["custom"] = "value"; - batch.TryAdd(message); + await batch.TryAdd(message); Assert.Equal((ulong)45, batch.Size); } diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs index 8661c90b..0b492921 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs @@ -474,10 +474,10 @@ public async Task ClientsUseGlobalConnectionCloseFirstClientSecoundClientShouldS var message2 = new Message("from".GetBytes()); var message3 = new Message("Sean Feldman".GetBytes()); - var batch = new Batch(100); - Assert.True(batch.TryAdd(message1), "Couldn't add first message"); - Assert.True(batch.TryAdd(message2), "Couldn't add second message"); - Assert.False(batch.TryAdd(message3), "Shouldn't be able to add third message"); + var batch = new Batch(100, Task.FromResult); + Assert.True(await batch.TryAdd(message1), "Couldn't add first message"); + Assert.True(await batch.TryAdd(message2), "Couldn't add second message"); + Assert.False(await batch.TryAdd(message3), "Shouldn't be able to add third message"); await sender.SendAsync(batch); batch.Dispose(); await sender.CloseAsync(); @@ -508,8 +508,8 @@ public async Task Sending_batch_with_properties() var message = new Message("Hello Neeraj".GetBytes()); message.UserProperties["custom"] = "value"; - var batch = new Batch(100); - Assert.True(batch.TryAdd(message), "Couldn't add message"); + var batch = new Batch(100, Task.FromResult); + Assert.True(await batch.TryAdd(message), "Couldn't add message"); await sender.SendAsync(batch); batch.Dispose(); await sender.CloseAsync(); @@ -544,5 +544,48 @@ public async Task Batch_should_have_maximum_size_set() await sender.CloseAsync().ConfigureAwait(false); } } + + [Fact] + [DisplayTestMethodName] + public async Task Batch_should_go_through_outgoing_plugins() + { + var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName); + var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName, receiveMode: ReceiveMode.ReceiveAndDelete); + + sender.RegisterPlugin(new RemoveVowelsPlugin()); + try + { + var batch = await sender.CreateBatch(); + await batch.TryAdd(new Message("Hello Neeraj".GetBytes())); + await batch.TryAdd(new Message("from".GetBytes())); + await batch.TryAdd(new Message("Sean Feldman".GetBytes())); + + await sender.SendAsync(batch); + batch.Dispose(); + await sender.CloseAsync(); + + var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 3); + var bodies = receivedMessages.Select(m => m.Body.GetString()); + var bodiesArray = bodies as string[] ?? bodies.ToArray(); + Assert.True(bodiesArray.Contains("Hll Nrj") && bodiesArray.Contains("frm") && bodiesArray.Contains("Sn Fldmn")); + } + finally + { + await sender.CloseAsync().ConfigureAwait(false); + await receiver.CloseAsync().ConfigureAwait(false); + } + } + + class RemoveVowelsPlugin : ServiceBusPlugin + { + public override string Name { get; } = nameof(RemoveVowelsPlugin); + + public override Task BeforeMessageSend(Message message) + { + message.Body = new string(message.Body.GetString().Where( x => "aeiouy".Contains(x) == false).ToArray()).GetBytes(); + return Task.FromResult(message); + } + } + } } \ No newline at end of file From a0933fe5f0a4a06523c986d18947ebf1eefa2f7e Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Thu, 23 Aug 2018 22:49:52 -0600 Subject: [PATCH 12/22] Report exception via diagnostics --- .../Core/MessageSender.cs | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index 8a0dc9c4..79739f72 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -280,33 +280,31 @@ public async Task SendAsync(Batch batch) MessagingEventSource.Log.MessageSendStart(this.ClientId, batch.Length); -// var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); -// var activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(messageList) : null; - Task sendTask = null; + var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); + // var activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(messageList) : null; + Task sendTask; try { - //var processedMessages = await this.ProcessMessages(messageList).ConfigureAwait(false); - sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(batch.ToAmqpMessage), this.OperationTimeout); await sendTask.ConfigureAwait(false); } catch (Exception exception) { -// if (isDiagnosticSourceEnabled) -// { -// this.diagnosticSource.ReportException(exception); -// } + if (isDiagnosticSourceEnabled) + { + this.diagnosticSource.ReportException(exception); + } MessagingEventSource.Log.MessageSendException(this.ClientId, exception); throw; } - finally - { -// this.diagnosticSource.SendStop(activity, messageList, sendTask?.Status); - } + // finally + // { + // this.diagnosticSource.SendStop(activity, messageList, sendTask?.Status); + // } -// MessagingEventSource.Log.MessageSendStop(this.ClientId); + // MessagingEventSource.Log.MessageSendStop(this.ClientId); } /// From 3f50da5ab360f3dc51c7d2b33af9d55d5be7056e Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Thu, 23 Aug 2018 22:52:50 -0600 Subject: [PATCH 13/22] Add tracking TODO --- src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index 79739f72..3b54a270 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -281,6 +281,8 @@ public async Task SendAsync(Batch batch) MessagingEventSource.Log.MessageSendStart(this.ClientId, batch.Length); var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); + // TODO: diagnostics (Start/Stop) is currently not possible. Requires change in how Diagnostics works. + // See https://github.com/SeanFeldman/azure-service-bus-dotnet/pull/1#issuecomment-415515524 for details. // var activity = isDiagnosticSourceEnabled ? this.diagnosticSource.SendStart(messageList) : null; Task sendTask; From a7b425da10399cd71be3b68eea9cd58b469d4727 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Thu, 4 Oct 2018 09:01:43 -0600 Subject: [PATCH 14/22] Rename Batch to MessageBatch --- .../Core/ISenderClient.cs | 8 ++++---- .../Core/{Batch.cs => MessageBatch.cs} | 13 ++++++------- .../Core/MessageSender.cs | 18 +++++++++--------- src/Microsoft.Azure.ServiceBus/QueueClient.cs | 10 +++++----- src/Microsoft.Azure.ServiceBus/TopicClient.cs | 10 +++++----- .../Primitives/BatchTests.cs | 12 ++++++------ .../SenderReceiverTests.cs | 4 ++-- 7 files changed, 37 insertions(+), 38 deletions(-) rename src/Microsoft.Azure.ServiceBus/Core/{Batch.cs => MessageBatch.cs} (91%) diff --git a/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs b/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs index 69aa5ab2..5a939d9f 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/ISenderClient.cs @@ -27,13 +27,13 @@ public interface ISenderClient : IClientEntity // TODO: extract methods into this interface for the next major version // /// - // /// Sends a of messages to Service Bus. + // /// Sends a of messages to Service Bus. // /// - // Task SendAsync(Batch batch); + // Task SendAsync(MessageBatch batch); // /// - // /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + // /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. // /// - // Task CreateBatch(); + // Task CreateBatch(); /// /// Schedules a message to appear on Service Bus. diff --git a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs similarity index 91% rename from src/Microsoft.Azure.ServiceBus/Core/Batch.cs rename to src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs index a8835603..f82d3d99 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/Batch.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs @@ -1,20 +1,19 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using System.Threading.Tasks; - namespace Microsoft.Azure.ServiceBus.Core { using System; using System.Collections.Generic; using System.Diagnostics; + using System.Threading.Tasks; using Microsoft.Azure.Amqp; using Microsoft.Azure.Amqp.Framing; using Microsoft.Azure.ServiceBus.Amqp; using Microsoft.Azure.ServiceBus.Diagnostics; [DebuggerDisplay("{" + nameof(DebuggerDisplay) + ",nq}")] - public class Batch : IDisposable + public class MessageBatch : IDisposable { internal readonly ulong maximumBatchSize; private readonly Func> pluginsCallback; @@ -32,7 +31,7 @@ public class Batch : IDisposable /// /// Maximum batch size allowed for batch. /// Plugins callback to invoke on outgoing messages regisered with batch. - public Batch(ulong maximumBatchSize, Func> pluginsCallback) + internal MessageBatch(ulong maximumBatchSize, Func> pluginsCallback) { this.maximumBatchSize = maximumBatchSize; this.pluginsCallback = pluginsCallback; @@ -86,7 +85,7 @@ public async Task TryAdd(Message message) /// Convert batch to AMQP message. /// /// - public AmqpMessage ToAmqpMessage() + internal AmqpMessage ToAmqpMessage() { ThrowIfDisposed(); @@ -135,10 +134,10 @@ private void ThrowIfDisposed() { if (result == null) { - throw new Exception("Batch is has been disposed and cannot be re-used."); + throw new Exception("MessageBatch is has been disposed and cannot be re-used."); } } - private string DebuggerDisplay => $"Batch: size={Size} message count={datas.Count} maximum size={maximumBatchSize}"; + private string DebuggerDisplay => $"MessageBatch: size={Size}, message count={datas.Count}, maximum size={maximumBatchSize}."; } } \ No newline at end of file diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index 3b54a270..0fc9c346 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -272,13 +272,13 @@ public async Task SendAsync(IList messageList) } /// - /// Sends a of messages to Service Bus. + /// Sends a of messages to Service Bus. /// - public async Task SendAsync(Batch batch) + public async Task SendAsync(MessageBatch messageBatch) { this.ThrowIfClosed(); - MessagingEventSource.Log.MessageSendStart(this.ClientId, batch.Length); + MessagingEventSource.Log.MessageSendStart(this.ClientId, messageBatch.Length); var isDiagnosticSourceEnabled = ServiceBusDiagnosticSource.IsEnabled(); // TODO: diagnostics (Start/Stop) is currently not possible. Requires change in how Diagnostics works. @@ -288,7 +288,7 @@ public async Task SendAsync(Batch batch) try { - sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(batch.ToAmqpMessage), this.OperationTimeout); + sendTask = this.RetryPolicy.RunOperation(() => this.OnSendAsync(messageBatch.ToAmqpMessage), this.OperationTimeout); await sendTask.ConfigureAwait(false); } catch (Exception exception) @@ -310,13 +310,13 @@ public async Task SendAsync(Batch batch) } /// - /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. /// - public async Task CreateBatch() + public async Task CreateBatch() { if (maxMessageSize != 0) { - return new Batch(maxMessageSize, ProcessMessage); + return new MessageBatch(maxMessageSize, ProcessMessage); } var timeoutHelper = new TimeoutHelper(this.OperationTimeout, true); @@ -330,12 +330,12 @@ public async Task CreateBatch() if (!amqpLink.Settings.MaxMessageSize.HasValue) { - throw new Exception("Broker didn't provide maximum message size. Batch requires maximum message size to operate."); + throw new Exception("Broker didn't provide maximum message size. MessageBatch requires maximum message size to operate."); } maxMessageSize = amqpLink.Settings.MaxMessageSize.Value; - return new Batch(maxMessageSize, ProcessMessage); + return new MessageBatch(maxMessageSize, ProcessMessage); } catch (Exception exception) { diff --git a/src/Microsoft.Azure.ServiceBus/QueueClient.cs b/src/Microsoft.Azure.ServiceBus/QueueClient.cs index dc9696bd..9177a724 100644 --- a/src/Microsoft.Azure.ServiceBus/QueueClient.cs +++ b/src/Microsoft.Azure.ServiceBus/QueueClient.cs @@ -350,19 +350,19 @@ public Task SendAsync(IList messageList) } /// - /// Sends a of messages to Service Bus. + /// Sends a of messages to Service Bus. /// - public Task SendAsync(Batch batch) + public Task SendAsync(MessageBatch messageBatch) { this.ThrowIfClosed(); - return this.innerSender.SendAsync(batch); + return this.innerSender.SendAsync(messageBatch); } /// - /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. /// - public Task CreateBatch() + public Task CreateBatch() { this.ThrowIfClosed(); diff --git a/src/Microsoft.Azure.ServiceBus/TopicClient.cs b/src/Microsoft.Azure.ServiceBus/TopicClient.cs index c833add1..9aec2a3c 100644 --- a/src/Microsoft.Azure.ServiceBus/TopicClient.cs +++ b/src/Microsoft.Azure.ServiceBus/TopicClient.cs @@ -185,19 +185,19 @@ public Task SendAsync(IList messageList) } /// - /// Sends a of messages to Service Bus. + /// Sends a of messages to Service Bus. /// - public Task SendAsync(Batch batch) + public Task SendAsync(MessageBatch messageBatch) { this.ThrowIfClosed(); - return this.innerSender.SendAsync(batch); + return this.innerSender.SendAsync(messageBatch); } /// - /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. + /// Create a new setting maximum size to the maximum message size allowed by the underlying namespace. /// - public Task CreateBatch() + public Task CreateBatch() { this.ThrowIfClosed(); diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs index 5ff743d9..11d3137f 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs @@ -17,7 +17,7 @@ public class BatchTests [Fact] public async Task Should_return_false_when_is_about_to_exceed_max_batch_size() { - using (var batch = new Batch(1, fakePluginsCallback)) + using (var batch = new MessageBatch(1, fakePluginsCallback)) { var wasAdded = await batch.TryAdd(new Message(Encoding.UTF8.GetBytes("hello"))); Assert.False(wasAdded, "Message should not have been added, but it was."); @@ -27,7 +27,7 @@ public async Task Should_return_false_when_is_about_to_exceed_max_batch_size() [Fact] public void Should_throw_if_batch_disposed() { - using (var batch = new Batch(1, fakePluginsCallback)) + using (var batch = new MessageBatch(1, fakePluginsCallback)) { batch.Dispose(); Assert.ThrowsAsync(() => batch.TryAdd(new Message())); @@ -37,7 +37,7 @@ public void Should_throw_if_batch_disposed() [Fact] public void Should_throw_when_trying_to_add_an_already_received_message_to_batch() { - using (var batch = new Batch(100, fakePluginsCallback)) + using (var batch = new MessageBatch(100, fakePluginsCallback)) { var message = new Message("test".GetBytes()); message.SystemProperties.LockTokenGuid = Guid.NewGuid(); @@ -51,7 +51,7 @@ public void Should_throw_when_trying_to_add_an_already_received_message_to_batch [InlineData(3)] public async Task Should_report_how_many_messages_are_in_batch(int numberOfMessages) { - using (var batch = new Batch(100, fakePluginsCallback)) + using (var batch = new MessageBatch(100, fakePluginsCallback)) { for (var i = 0; i < numberOfMessages; i++) { @@ -65,7 +65,7 @@ public async Task Should_report_how_many_messages_are_in_batch(int numberOfMessa [Fact] public async Task Should_reflect_property_in_batch_size() { - using (var batch = new Batch(100, fakePluginsCallback)) + using (var batch = new MessageBatch(100, fakePluginsCallback)) { var message = new Message(); @@ -74,7 +74,7 @@ public async Task Should_reflect_property_in_batch_size() Assert.Equal((ulong)24, batch.Size); } - using (var batch = new Batch(100, fakePluginsCallback)) + using (var batch = new MessageBatch(100, fakePluginsCallback)) { var message = new Message(); message.UserProperties["custom"] = "value"; diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs index 0b492921..47a1042c 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs @@ -474,7 +474,7 @@ public async Task ClientsUseGlobalConnectionCloseFirstClientSecoundClientShouldS var message2 = new Message("from".GetBytes()); var message3 = new Message("Sean Feldman".GetBytes()); - var batch = new Batch(100, Task.FromResult); + var batch = new MessageBatch(100, Task.FromResult); Assert.True(await batch.TryAdd(message1), "Couldn't add first message"); Assert.True(await batch.TryAdd(message2), "Couldn't add second message"); Assert.False(await batch.TryAdd(message3), "Shouldn't be able to add third message"); @@ -508,7 +508,7 @@ public async Task Sending_batch_with_properties() var message = new Message("Hello Neeraj".GetBytes()); message.UserProperties["custom"] = "value"; - var batch = new Batch(100, Task.FromResult); + var batch = new MessageBatch(100, Task.FromResult); Assert.True(await batch.TryAdd(message), "Couldn't add message"); await sender.SendAsync(batch); batch.Dispose(); From 8f98ee262a456eb289866916c63c53b8d8c03762 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Thu, 4 Oct 2018 09:06:23 -0600 Subject: [PATCH 15/22] Move extension method into appropreate class --- .../Extensions/MessageDiagnosticsExtensions.cs | 9 --------- .../Extensions/MessageExtensions.cs | 15 +++++++++++++++ 2 files changed, 15 insertions(+), 9 deletions(-) create mode 100644 src/Microsoft.Azure.ServiceBus/Extensions/MessageExtensions.cs diff --git a/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs b/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs index e2d6cead..3d38191c 100644 --- a/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs +++ b/src/Microsoft.Azure.ServiceBus/Extensions/MessageDiagnosticsExtensions.cs @@ -150,14 +150,5 @@ internal static bool TryExtractContext(this Message message, out IList Date: Thu, 4 Oct 2018 09:18:20 -0600 Subject: [PATCH 16/22] Use correct exception type --- src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs index f82d3d99..7e1a1a32 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs @@ -134,7 +134,7 @@ private void ThrowIfDisposed() { if (result == null) { - throw new Exception("MessageBatch is has been disposed and cannot be re-used."); + throw new ObjectDisposedException("MessageBatch is has been disposed and cannot be re-used."); } } From 0aca4b01d623d7634ba9d786af7f41746243d3be Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Thu, 4 Oct 2018 09:21:23 -0600 Subject: [PATCH 17/22] Minor tweaks --- src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs | 4 ++-- src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs b/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs index 6909237c..33f0803a 100644 --- a/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs +++ b/src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs @@ -17,6 +17,8 @@ namespace Microsoft.Azure.ServiceBus.Amqp static class AmqpMessageConverter { + internal const string PartitionKeyName = "x-opt-partition-key"; + internal const string ViaPartitionKeyName = "x-opt-via-partition-key"; const string EnqueuedTimeUtcName = "x-opt-enqueued-time"; const string ScheduledEnqueueTimeUtcName = "x-opt-scheduled-enqueue-time"; const string SequenceNumberName = "x-opt-sequence-number"; @@ -24,8 +26,6 @@ static class AmqpMessageConverter const string LockedUntilName = "x-opt-locked-until"; const string PublisherName = "x-opt-publisher"; const string PartitionIdName = "x-opt-partition-id"; - internal const string PartitionKeyName = "x-opt-partition-key"; - internal const string ViaPartitionKeyName = "x-opt-via-partition-key"; const string DeadLetterSourceName = "x-opt-deadletter-source"; const string TimeSpanName = AmqpConstants.Vendor + ":timespan"; const string UriName = AmqpConstants.Vendor + ":uri"; diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs index 7e1a1a32..da3af954 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs @@ -134,10 +134,10 @@ private void ThrowIfDisposed() { if (result == null) { - throw new ObjectDisposedException("MessageBatch is has been disposed and cannot be re-used."); + throw new ObjectDisposedException("MessageBatch has been disposed and cannot be re-used."); } } - private string DebuggerDisplay => $"MessageBatch: size={Size}, message count={datas.Count}, maximum size={maximumBatchSize}."; + private string DebuggerDisplay => $"MessageBatch: size={Size}; message count={datas.Count}; maximum size={maximumBatchSize}."; } } \ No newline at end of file From 484e30325c51d48928ea2ff974c2c48640e1c682 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Thu, 4 Oct 2018 09:29:27 -0600 Subject: [PATCH 18/22] Ensure properties from the first message needed for a batch are included in the safe size calculation. --- .../Core/MessageBatch.cs | 42 +++++++++---------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs index da3af954..377a253c 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageBatch.cs @@ -20,7 +20,6 @@ public class MessageBatch : IDisposable private AmqpMessage firstMessage; private readonly List datas; private AmqpMessage result; - private (string messageId, string sessionId, string partitionKey, string viaPartitionKey) originalMessageData; /// /// Construct a new batch with a maximum batch size and outgoing plugins callback. @@ -56,8 +55,27 @@ public async Task TryAdd(Message message) if (firstMessage == null) { - originalMessageData = (processedMessage.MessageId, processedMessage.SessionId, processedMessage.PartitionKey, processedMessage.ViaPartitionKey); firstMessage = amqpMessage; + + if (processedMessage.MessageId != null) + { + result.Properties.MessageId = processedMessage.MessageId; + } + + if (processedMessage.SessionId != null) + { + result.Properties.GroupId = processedMessage.SessionId; + } + + if (processedMessage.PartitionKey != null) + { + result.MessageAnnotations.Map[AmqpMessageConverter.PartitionKeyName] = processedMessage.PartitionKey; + } + + if (processedMessage.ViaPartitionKey != null) + { + result.MessageAnnotations.Map[AmqpMessageConverter.ViaPartitionKeyName] = processedMessage.ViaPartitionKey; + } } var data = AmqpMessageConverter.ToData(amqpMessage); @@ -95,26 +113,6 @@ internal AmqpMessage ToAmqpMessage() return firstMessage; } - if (originalMessageData.messageId != null) - { - result.Properties.MessageId = originalMessageData.messageId; - } - - if (originalMessageData.sessionId != null) - { - result.Properties.GroupId = originalMessageData.sessionId; - } - - if (originalMessageData.partitionKey != null) - { - result.MessageAnnotations.Map[AmqpMessageConverter.PartitionKeyName] = originalMessageData.partitionKey; - } - - if (originalMessageData.viaPartitionKey != null) - { - result.MessageAnnotations.Map[AmqpMessageConverter.ViaPartitionKeyName] = originalMessageData.viaPartitionKey; - } - result.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat; result.Batchable = true; return result; From c02b754ca5b14acc745dd7b1abaed37c9e0934e6 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Thu, 4 Oct 2018 09:32:26 -0600 Subject: [PATCH 19/22] Fix and rename batch test --- .../{BatchTests.cs => MessageBatchTests.cs} | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) rename test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/{BatchTests.cs => MessageBatchTests.cs} (83%) diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/MessageBatchTests.cs similarity index 83% rename from test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs rename to test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/MessageBatchTests.cs index 11d3137f..ef21897c 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/BatchTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/Primitives/MessageBatchTests.cs @@ -10,9 +10,9 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives using Microsoft.Azure.ServiceBus.Core; using Xunit; - public class BatchTests + public class MessageBatchTests { - private Func> fakePluginsCallback = Task.FromResult; + private readonly Func> fakePluginsCallback = Task.FromResult; [Fact] public async Task Should_return_false_when_is_about_to_exceed_max_batch_size() @@ -25,24 +25,24 @@ public async Task Should_return_false_when_is_about_to_exceed_max_batch_size() } [Fact] - public void Should_throw_if_batch_disposed() + public async Task Should_throw_if_batch_disposed() { using (var batch = new MessageBatch(1, fakePluginsCallback)) { batch.Dispose(); - Assert.ThrowsAsync(() => batch.TryAdd(new Message())); + await Assert.ThrowsAsync(() => batch.TryAdd(new Message())); } } [Fact] - public void Should_throw_when_trying_to_add_an_already_received_message_to_batch() + public async Task Should_throw_when_trying_to_add_an_already_received_message_to_batch() { using (var batch = new MessageBatch(100, fakePluginsCallback)) { var message = new Message("test".GetBytes()); message.SystemProperties.LockTokenGuid = Guid.NewGuid(); - Assert.ThrowsAsync(() => batch.TryAdd(message)); + await Assert.ThrowsAsync(() => batch.TryAdd(message)); } } From eff85a76589354b23962d384a4b6d0efd1af3d14 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Thu, 4 Oct 2018 21:55:29 -0600 Subject: [PATCH 20/22] Approving API --- ...rovals.ApproveAzureServiceBus.approved.txt | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index dbc33a1b..a69b1b2a 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -230,7 +230,7 @@ namespace Microsoft.Azure.ServiceBus public System.Threading.Tasks.Task AbandonAsync(string lockToken, System.Collections.Generic.IDictionary propertiesToModify = null) { } public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { } public System.Threading.Tasks.Task CompleteAsync(string lockToken) { } - public System.Threading.Tasks.Task CreateBatch() { } + public System.Threading.Tasks.Task CreateBatch() { } public System.Threading.Tasks.Task DeadLetterAsync(string lockToken, System.Collections.Generic.IDictionary propertiesToModify = null) { } public System.Threading.Tasks.Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null) { } protected override System.Threading.Tasks.Task OnClosingAsync() { } @@ -242,7 +242,7 @@ namespace Microsoft.Azure.ServiceBus public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } - public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Core.Batch batch) { } + public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Core.MessageBatch messageBatch) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } public sealed class QuotaExceededException : Microsoft.Azure.ServiceBus.ServiceBusException @@ -464,13 +464,13 @@ namespace Microsoft.Azure.ServiceBus public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; } public string TopicName { get; } public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { } - public System.Threading.Tasks.Task CreateBatch() { } + public System.Threading.Tasks.Task CreateBatch() { } protected override System.Threading.Tasks.Task OnClosingAsync() { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } - public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Core.Batch batch) { } + public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Core.MessageBatch messageBatch) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } public enum TransportType @@ -496,15 +496,6 @@ namespace Microsoft.Azure.ServiceBus } namespace Microsoft.Azure.ServiceBus.Core { - [System.Diagnostics.DebuggerDisplayAttribute("{DebuggerDisplay,nq}")] - public class Batch : System.IDisposable - { - public Batch(ulong maximumBatchSize, System.Func> pluginsCallback) { } - public int Length { get; } - public void Dispose() { } - public Microsoft.Azure.Amqp.AmqpMessage ToAmqpMessage() { } - public System.Threading.Tasks.Task TryAdd(Microsoft.Azure.ServiceBus.Message message) { } - } public interface IMessageReceiver : Microsoft.Azure.ServiceBus.Core.IReceiverClient, Microsoft.Azure.ServiceBus.IClientEntity { long LastPeekedSequenceNumber { get; } @@ -542,6 +533,13 @@ namespace Microsoft.Azure.ServiceBus.Core System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message); System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList); } + [System.Diagnostics.DebuggerDisplayAttribute("{DebuggerDisplay,nq}")] + public class MessageBatch : System.IDisposable + { + public int Length { get; } + public void Dispose() { } + public System.Threading.Tasks.Task TryAdd(Microsoft.Azure.ServiceBus.Message message) { } + } public class MessageReceiver : Microsoft.Azure.ServiceBus.ClientEntity, Microsoft.Azure.ServiceBus.Core.IMessageReceiver, Microsoft.Azure.ServiceBus.Core.IReceiverClient, Microsoft.Azure.ServiceBus.IClientEntity { public MessageReceiver(Microsoft.Azure.ServiceBus.ServiceBusConnectionStringBuilder connectionStringBuilder, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null, int prefetchCount = 0) { } @@ -601,13 +599,13 @@ namespace Microsoft.Azure.ServiceBus.Core public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; } public string TransferDestinationPath { get; } public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { } - public System.Threading.Tasks.Task CreateBatch() { } + public System.Threading.Tasks.Task CreateBatch() { } protected override System.Threading.Tasks.Task OnClosingAsync() { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } - public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Core.Batch batch) { } + public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Core.MessageBatch messageBatch) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } public abstract class ServiceBusPlugin From eabe61731b6f2bfe4f6ee1fa2b58bdb51bbba012 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Fri, 26 Oct 2018 00:47:34 -0600 Subject: [PATCH 21/22] Uncomment log statement --- src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs index 0fc9c346..027ae595 100644 --- a/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs +++ b/src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs @@ -306,7 +306,7 @@ public async Task SendAsync(MessageBatch messageBatch) // this.diagnosticSource.SendStop(activity, messageList, sendTask?.Status); // } - // MessagingEventSource.Log.MessageSendStop(this.ClientId); + MessagingEventSource.Log.MessageSendStop(this.ClientId); } /// From 6e2043935cafe5c56dd190264cec6ba74279a7e4 Mon Sep 17 00:00:00 2001 From: Sean Feldman Date: Wed, 20 Mar 2019 18:34:16 -0600 Subject: [PATCH 22/22] Resolved conflict from merging v3.4.0 --- .../SenderReceiverTests.cs | 122 ++---------------- 1 file changed, 8 insertions(+), 114 deletions(-) diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs index 47a1042c..4ce85af4 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs @@ -4,7 +4,6 @@ namespace Microsoft.Azure.ServiceBus.UnitTests { using System; - using System.Linq; using System.Text; using System.Collections.Generic; using System.Threading; @@ -14,9 +13,9 @@ namespace Microsoft.Azure.ServiceBus.UnitTests public class SenderReceiverTests : SenderReceiverClientTestBase { - private static readonly TimeSpan TwoSeconds = TimeSpan.FromSeconds(2); + private static TimeSpan TwoSeconds = TimeSpan.FromSeconds(2); - public static IEnumerable TestPermutations => new[] + public static IEnumerable TestPermutations => new object[][] { new object[] {TestConstants.NonPartitionedQueueName}, new object[] {TestConstants.PartitionedQueueName} @@ -464,31 +463,16 @@ public async Task ClientsUseGlobalConnectionCloseFirstClientSecoundClientShouldS [Theory] [InlineData(TestConstants.NonPartitionedQueueName)] [DisplayTestMethodName] - var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName); - var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 1); + async Task MessageSenderShouldNotThrowWhenSendingEmptyCollection(string queueName) + { + var sender = new MessageSender(TestUtility.NamespaceConnectionString, queueName); var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, queueName, ReceiveMode.ReceiveAndDelete); try { - var message1 = new Message("Hello Neeraj".GetBytes()); - var message2 = new Message("from".GetBytes()); - var message3 = new Message("Sean Feldman".GetBytes()); - - var batch = new MessageBatch(100, Task.FromResult); - Assert.True(await batch.TryAdd(message1), "Couldn't add first message"); - Assert.True(await batch.TryAdd(message2), "Couldn't add second message"); - Assert.False(await batch.TryAdd(message3), "Shouldn't be able to add third message"); - await sender.SendAsync(batch); - batch.Dispose(); - await sender.CloseAsync(); - - var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 2); - var bodies = receivedMessages.Select(m => m.Body.GetString()); - var bodiesArray = bodies as string[] ?? bodies.ToArray(); - Assert.True(bodiesArray.Contains("Hello Neeraj") && bodiesArray.Contains("from")); - - var extraMessage = await TestUtility.PeekMessageAsync(receiver); - Assert.True(extraMessage == null, $"Should not have any messages other than the two, but an extra message is found. Body='{extraMessage?.Body.GetString()}'"); + await sender.SendAsync(new List()); + var message = await receiver.ReceiveAsync(TimeSpan.FromSeconds(3)); + Assert.True(message == null, "Expected not to find any messages, but a message was received."); } finally { @@ -497,95 +481,5 @@ public async Task ClientsUseGlobalConnectionCloseFirstClientSecoundClientShouldS } } - [Fact] - [DisplayTestMethodName] - public async Task Sending_batch_with_properties() - { - var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName); - var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName, receiveMode: ReceiveMode.ReceiveAndDelete); - try - { - var message = new Message("Hello Neeraj".GetBytes()); - message.UserProperties["custom"] = "value"; - - var batch = new MessageBatch(100, Task.FromResult); - Assert.True(await batch.TryAdd(message), "Couldn't add message"); - await sender.SendAsync(batch); - batch.Dispose(); - await sender.CloseAsync(); - - var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 1); - var receivedMessage = receivedMessages.FirstOrDefault(); - Assert.NotNull(receivedMessage); - Assert.Equal("value", receivedMessage.UserProperties["custom"]); - } - finally - { - await sender.CloseAsync().ConfigureAwait(false); - await receiver.CloseAsync().ConfigureAwait(false); - } - } - - [Fact] - [DisplayTestMethodName] - public async Task Batch_should_have_maximum_size_set() - { - var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName); - try - { - using (var batch = await sender.CreateBatch()) - { - Assert.True(batch.maximumBatchSize == 256 * 1024 || batch.maximumBatchSize == 1024 * 1024, - $"Maximum batch size was expected to be 256KB or 1MB, but it wasn't. Reported size: {batch.maximumBatchSize}"); - } - } - finally - { - await sender.CloseAsync().ConfigureAwait(false); - } - } - - [Fact] - [DisplayTestMethodName] - public async Task Batch_should_go_through_outgoing_plugins() - { - var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName); - var receiver = new MessageReceiver(TestUtility.NamespaceConnectionString, TestConstants.PartitionedQueueName, receiveMode: ReceiveMode.ReceiveAndDelete); - - sender.RegisterPlugin(new RemoveVowelsPlugin()); - try - { - var batch = await sender.CreateBatch(); - await batch.TryAdd(new Message("Hello Neeraj".GetBytes())); - await batch.TryAdd(new Message("from".GetBytes())); - await batch.TryAdd(new Message("Sean Feldman".GetBytes())); - - await sender.SendAsync(batch); - batch.Dispose(); - await sender.CloseAsync(); - - var receivedMessages = await TestUtility.ReceiveMessagesAsync(receiver, 3); - var bodies = receivedMessages.Select(m => m.Body.GetString()); - var bodiesArray = bodies as string[] ?? bodies.ToArray(); - Assert.True(bodiesArray.Contains("Hll Nrj") && bodiesArray.Contains("frm") && bodiesArray.Contains("Sn Fldmn")); - } - finally - { - await sender.CloseAsync().ConfigureAwait(false); - await receiver.CloseAsync().ConfigureAwait(false); - } - } - - class RemoveVowelsPlugin : ServiceBusPlugin - { - public override string Name { get; } = nameof(RemoveVowelsPlugin); - - public override Task BeforeMessageSend(Message message) - { - message.Body = new string(message.Body.GetString().Where( x => "aeiouy".Contains(x) == false).ToArray()).GetBytes(); - return Task.FromResult(message); - } - } - } } \ No newline at end of file