@@ -19,6 +19,8 @@ namespace MQTTnet.Extensions.ManagedClient
1919{
2020 public sealed class ManagedMqttClient : Disposable , IManagedMqttClient
2121 {
22+ readonly MqttNetSourceLogger _logger ;
23+
2224 readonly AsyncEvent < InterceptingPublishMessageEventArgs > _interceptingPublishMessageEvent = new AsyncEvent < InterceptingPublishMessageEventArgs > ( ) ;
2325 readonly AsyncEvent < ApplicationMessageProcessedEventArgs > _applicationMessageProcessedEvent = new AsyncEvent < ApplicationMessageProcessedEventArgs > ( ) ;
2426 readonly AsyncEvent < ApplicationMessageSkippedEventArgs > _applicationMessageSkippedEvent = new AsyncEvent < ApplicationMessageSkippedEventArgs > ( ) ;
@@ -27,9 +29,7 @@ public sealed class ManagedMqttClient : Disposable, IManagedMqttClient
2729 readonly AsyncEvent < ManagedProcessFailedEventArgs > _synchronizingSubscriptionsFailedEvent = new AsyncEvent < ManagedProcessFailedEventArgs > ( ) ;
2830 readonly AsyncEvent < SubscriptionsChangedEventArgs > _subscriptionsChangedEvent = new AsyncEvent < SubscriptionsChangedEventArgs > ( ) ;
2931
30- readonly MqttNetSourceLogger _logger ;
3132 readonly BlockingQueue < ManagedMqttApplicationMessage > _messageQueue = new BlockingQueue < ManagedMqttApplicationMessage > ( ) ;
32-
3333 readonly AsyncLock _messageQueueLock = new AsyncLock ( ) ;
3434
3535 /// <summary>
@@ -83,21 +83,19 @@ public event Func<ApplicationMessageSkippedEventArgs, Task> ApplicationMessageSk
8383 add => _applicationMessageSkippedEvent . AddHandler ( value ) ;
8484 remove => _applicationMessageSkippedEvent . RemoveHandler ( value ) ;
8585 }
86-
86+
8787 public event Func < ApplicationMessageProcessedEventArgs , Task > ApplicationMessageProcessedAsync
8888 {
8989 add => _applicationMessageProcessedEvent . AddHandler ( value ) ;
9090 remove => _applicationMessageProcessedEvent . RemoveHandler ( value ) ;
9191 }
9292
93-
9493 public event Func < InterceptingPublishMessageEventArgs , Task > InterceptPublishMessageAsync
9594 {
9695 add => _interceptingPublishMessageEvent . AddHandler ( value ) ;
9796 remove => _interceptingPublishMessageEvent . RemoveHandler ( value ) ;
9897 }
9998
100-
10199 public event Func < MqttApplicationMessageReceivedEventArgs , Task > ApplicationMessageReceivedAsync
102100 {
103101 add => InternalClient . ApplicationMessageReceivedAsync += value ;
@@ -149,7 +147,7 @@ public event Func<SubscriptionsChangedEventArgs, Task> SubscriptionsChangedAsync
149147 public ManagedMqttClientOptions Options { get ; private set ; }
150148
151149 public int PendingApplicationMessagesCount => _messageQueue . Count ;
152-
150+
153151 public async Task EnqueueAsync ( MqttApplicationMessage applicationMessage )
154152 {
155153 ThrowIfDisposed ( ) ;
@@ -277,7 +275,7 @@ public async Task StopAsync(bool cleanDisconnect = true)
277275 ThrowIfDisposed ( ) ;
278276
279277 _isCleanDisconnect = cleanDisconnect ;
280-
278+
281279 StopPublishing ( ) ;
282280 StopMaintainingConnection ( ) ;
283281
@@ -369,6 +367,13 @@ static TimeSpan GetRemainingTime(DateTime endTime)
369367 return remainingTime < TimeSpan . Zero ? TimeSpan . Zero : remainingTime ;
370368 }
371369
370+ CancellationTokenSource NewTimeoutToken ( CancellationToken linkedToken )
371+ {
372+ var newTimeoutToken = CancellationTokenSource . CreateLinkedTokenSource ( linkedToken ) ;
373+ newTimeoutToken . CancelAfter ( Options . ClientOptions . Timeout ) ;
374+ return newTimeoutToken ;
375+ }
376+
372377 async Task HandleSubscriptionExceptionAsync ( Exception exception , List < MqttTopicFilter > addedSubscriptions , List < string > removedSubscriptions )
373378 {
374379 _logger . Warning ( exception , "Synchronizing subscriptions failed." ) ;
@@ -411,7 +416,7 @@ async Task MaintainConnectionAsync(CancellationToken cancellationToken)
411416 {
412417 if ( _isCleanDisconnect )
413418 {
414- using ( var disconnectTimeout = new CancellationTokenSource ( Options . ClientOptions . Timeout ) )
419+ using ( var disconnectTimeout = NewTimeoutToken ( CancellationToken . None ) )
415420 {
416421 await InternalClient . DisconnectAsync ( new MqttClientDisconnectOptions ( ) , disconnectTimeout . Token ) . ConfigureAwait ( false ) ;
417422 }
@@ -461,7 +466,7 @@ async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
461466
462467 cancellationToken . ThrowIfCancellationRequested ( ) ;
463468
464- await TryPublishQueuedMessageAsync ( message ) . ConfigureAwait ( false ) ;
469+ await TryPublishQueuedMessageAsync ( message , cancellationToken ) . ConfigureAwait ( false ) ;
465470 }
466471 }
467472 catch ( OperationCanceledException )
@@ -477,7 +482,7 @@ async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
477482 }
478483 }
479484
480- async Task PublishReconnectSubscriptionsAsync ( )
485+ async Task PublishReconnectSubscriptionsAsync ( CancellationToken cancellationToken )
481486 {
482487 _logger . Info ( "Publishing subscriptions at reconnect" ) ;
483488
@@ -489,20 +494,20 @@ async Task PublishReconnectSubscriptionsAsync()
489494 {
490495 topicFilters = new List < MqttTopicFilter > ( ) ;
491496 SendSubscribeUnsubscribeResult subscribeUnsubscribeResult ;
492-
497+
493498 foreach ( var sub in _reconnectSubscriptions )
494499 {
495500 topicFilters . Add ( sub . Value ) ;
496501
497502 if ( topicFilters . Count == Options . MaxTopicFiltersInSubscribeUnsubscribePackets )
498503 {
499- subscribeUnsubscribeResult = await SendSubscribeUnsubscribe ( topicFilters , null ) . ConfigureAwait ( false ) ;
504+ subscribeUnsubscribeResult = await SendSubscribeUnsubscribe ( topicFilters , null , cancellationToken ) . ConfigureAwait ( false ) ;
500505 topicFilters . Clear ( ) ;
501506 await HandleSubscriptionsResultAsync ( subscribeUnsubscribeResult ) . ConfigureAwait ( false ) ;
502507 }
503508 }
504509
505- subscribeUnsubscribeResult = await SendSubscribeUnsubscribe ( topicFilters , null ) . ConfigureAwait ( false ) ;
510+ subscribeUnsubscribeResult = await SendSubscribeUnsubscribe ( topicFilters , null , cancellationToken ) . ConfigureAwait ( false ) ;
506511 await HandleSubscriptionsResultAsync ( subscribeUnsubscribeResult ) . ConfigureAwait ( false ) ;
507512 }
508513 }
@@ -555,13 +560,13 @@ async Task PublishSubscriptionsAsync(TimeSpan timeout, CancellationToken cancell
555560
556561 if ( addedTopicFilters . Count == Options . MaxTopicFiltersInSubscribeUnsubscribePackets )
557562 {
558- subscribeUnsubscribeResult = await SendSubscribeUnsubscribe ( addedTopicFilters , null ) . ConfigureAwait ( false ) ;
563+ subscribeUnsubscribeResult = await SendSubscribeUnsubscribe ( addedTopicFilters , null , cancellationToken ) . ConfigureAwait ( false ) ;
559564 addedTopicFilters . Clear ( ) ;
560565 await HandleSubscriptionsResultAsync ( subscribeUnsubscribeResult ) . ConfigureAwait ( false ) ;
561566 }
562567 }
563-
564- subscribeUnsubscribeResult = await SendSubscribeUnsubscribe ( addedTopicFilters , null ) . ConfigureAwait ( false ) ;
568+
569+ subscribeUnsubscribeResult = await SendSubscribeUnsubscribe ( addedTopicFilters , null , cancellationToken ) . ConfigureAwait ( false ) ;
565570 await HandleSubscriptionsResultAsync ( subscribeUnsubscribeResult ) . ConfigureAwait ( false ) ;
566571
567572 var removedTopicFilters = new List < string > ( ) ;
@@ -571,13 +576,13 @@ async Task PublishSubscriptionsAsync(TimeSpan timeout, CancellationToken cancell
571576
572577 if ( removedTopicFilters . Count == Options . MaxTopicFiltersInSubscribeUnsubscribePackets )
573578 {
574- subscribeUnsubscribeResult = await SendSubscribeUnsubscribe ( null , removedTopicFilters ) . ConfigureAwait ( false ) ;
579+ subscribeUnsubscribeResult = await SendSubscribeUnsubscribe ( null , removedTopicFilters , cancellationToken ) . ConfigureAwait ( false ) ;
575580 removedTopicFilters . Clear ( ) ;
576581 await HandleSubscriptionsResultAsync ( subscribeUnsubscribeResult ) . ConfigureAwait ( false ) ;
577582 }
578583 }
579584
580- subscribeUnsubscribeResult = await SendSubscribeUnsubscribe ( null , removedTopicFilters ) . ConfigureAwait ( false ) ;
585+ subscribeUnsubscribeResult = await SendSubscribeUnsubscribe ( null , removedTopicFilters , cancellationToken ) . ConfigureAwait ( false ) ;
581586 await HandleSubscriptionsResultAsync ( subscribeUnsubscribeResult ) . ConfigureAwait ( false ) ;
582587 }
583588 }
@@ -592,7 +597,7 @@ async Task<ReconnectionResult> ReconnectIfRequiredAsync(CancellationToken cancel
592597 MqttClientConnectResult connectResult = null ;
593598 try
594599 {
595- using ( var connectTimeout = new CancellationTokenSource ( Options . ClientOptions . Timeout ) )
600+ using ( var connectTimeout = NewTimeoutToken ( cancellationToken ) )
596601 {
597602 connectResult = await InternalClient . ConnectAsync ( Options . ClientOptions , connectTimeout . Token ) . ConfigureAwait ( false ) ;
598603 }
@@ -611,7 +616,7 @@ async Task<ReconnectionResult> ReconnectIfRequiredAsync(CancellationToken cancel
611616 }
612617 }
613618
614- async Task < SendSubscribeUnsubscribeResult > SendSubscribeUnsubscribe ( List < MqttTopicFilter > addedSubscriptions , List < string > removedSubscriptions )
619+ async Task < SendSubscribeUnsubscribeResult > SendSubscribeUnsubscribe ( List < MqttTopicFilter > addedSubscriptions , List < string > removedSubscriptions , CancellationToken cancellationToken )
615620 {
616621 var subscribeResults = new List < MqttClientSubscribeResult > ( ) ;
617622 var unsubscribeResults = new List < MqttClientUnsubscribeResult > ( ) ;
@@ -626,8 +631,11 @@ async Task<SendSubscribeUnsubscribeResult> SendSubscribeUnsubscribe(List<MqttTop
626631 unsubscribeOptionsBuilder . WithTopicFilter ( removedSubscription ) ;
627632 }
628633
629- var unsubscribeResult = await InternalClient . UnsubscribeAsync ( unsubscribeOptionsBuilder . Build ( ) ) . ConfigureAwait ( false ) ;
630- unsubscribeResults . Add ( unsubscribeResult ) ;
634+ using ( var unsubscribeTimeout = NewTimeoutToken ( cancellationToken ) )
635+ {
636+ var unsubscribeResult = await InternalClient . UnsubscribeAsync ( unsubscribeOptionsBuilder . Build ( ) , unsubscribeTimeout . Token ) . ConfigureAwait ( false ) ;
637+ unsubscribeResults . Add ( unsubscribeResult ) ;
638+ }
631639
632640 //clear because these worked, maybe the subscribe below will fail, only report those
633641 removedSubscriptions . Clear ( ) ;
@@ -642,8 +650,11 @@ async Task<SendSubscribeUnsubscribeResult> SendSubscribeUnsubscribe(List<MqttTop
642650 subscribeOptionsBuilder . WithTopicFilter ( addedSubscription ) ;
643651 }
644652
645- var subscribeResult = await InternalClient . SubscribeAsync ( subscribeOptionsBuilder . Build ( ) ) . ConfigureAwait ( false ) ;
646- subscribeResults . Add ( subscribeResult ) ;
653+ using ( var subscribeTimeout = NewTimeoutToken ( cancellationToken ) )
654+ {
655+ var subscribeResult = await InternalClient . SubscribeAsync ( subscribeOptionsBuilder . Build ( ) , subscribeTimeout . Token ) . ConfigureAwait ( false ) ;
656+ subscribeResults . Add ( subscribeResult ) ;
657+ }
647658 }
648659 }
649660 catch ( Exception exception )
@@ -697,15 +708,15 @@ async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
697708 {
698709 var oldConnectionState = InternalClient . IsConnected ;
699710 var connectionState = await ReconnectIfRequiredAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
700-
711+
701712 if ( connectionState == ReconnectionResult . NotConnected )
702713 {
703714 StopPublishing ( ) ;
704715 await Task . Delay ( Options . AutoReconnectDelay , cancellationToken ) . ConfigureAwait ( false ) ;
705716 }
706717 else if ( connectionState == ReconnectionResult . Reconnected )
707718 {
708- await PublishReconnectSubscriptionsAsync ( ) . ConfigureAwait ( false ) ;
719+ await PublishReconnectSubscriptionsAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
709720 StartPublishing ( ) ;
710721 }
711722 else if ( connectionState == ReconnectionResult . Recovered )
@@ -735,7 +746,7 @@ async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
735746 }
736747 }
737748
738- async Task TryPublishQueuedMessageAsync ( ManagedMqttApplicationMessage message )
749+ async Task TryPublishQueuedMessageAsync ( ManagedMqttApplicationMessage message , CancellationToken cancellationToken )
739750 {
740751 Exception transmitException = null ;
741752 bool acceptPublish = true ;
@@ -750,7 +761,10 @@ async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message)
750761
751762 if ( acceptPublish )
752763 {
753- await InternalClient . PublishAsync ( message . ApplicationMessage ) . ConfigureAwait ( false ) ;
764+ using ( var publishTimeout = NewTimeoutToken ( cancellationToken ) )
765+ {
766+ await InternalClient . PublishAsync ( message . ApplicationMessage , publishTimeout . Token ) . ConfigureAwait ( false ) ;
767+ }
754768 }
755769
756770 using ( await _messageQueueLock . EnterAsync ( ) . ConfigureAwait ( false ) ) //lock to avoid conflict with this.PublishAsync
0 commit comments