Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Worker/Core/DurableTaskWorkerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public DataConverter DataConverter
/// or entity) can be processed concurrently by the worker. It is recommended to set these values based on the
/// expected workload and the resources available on the machine running the worker.
/// </remarks>
public ConcurrencyOptions Concurrency { get; } = new();
public ConcurrencyOptions Concurrency { get; init; } = new();

/// <summary>
/// Gets or sets the versioning options for the Durable Task worker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ public static class DurableTaskWorkerBuilderExtensions
/// <b>Note:</b> only 1 instance of gRPC worker is supported per sidecar.
/// </remarks>
public static IDurableTaskWorkerBuilder UseGrpc(this IDurableTaskWorkerBuilder builder)
=> builder.UseGrpc(opt => { });
=> builder.UseGrpc(opt =>
{
});

/// <summary>
/// Configures the <see cref="IDurableTaskWorkerBuilder" /> to be a gRPC client.
Expand Down
372 changes: 189 additions & 183 deletions src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs

Large diffs are not rendered by default.

118 changes: 99 additions & 19 deletions src/Worker/Grpc/GrpcDurableTaskWorker.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Diagnostics;
using Dapr.DurableTask.Worker.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand All @@ -10,13 +11,14 @@ namespace Dapr.DurableTask.Worker.Grpc;
/// <summary>
/// The gRPC Durable Task worker.
/// </summary>
sealed partial class GrpcDurableTaskWorker : DurableTaskWorker
partial class GrpcDurableTaskWorker : DurableTaskWorker
{
readonly GrpcDurableTaskWorkerOptions grpcOptions;
readonly DurableTaskWorkerOptions workerOptions;
readonly IServiceProvider services;
readonly ILoggerFactory loggerFactory;
readonly ILogger logger;
int reconnectAttempts;

/// <summary>
/// Initializes a new instance of the <see cref="GrpcDurableTaskWorker" /> class.
Expand All @@ -43,57 +45,135 @@ public GrpcDurableTaskWorker(
this.logger = loggerFactory.CreateLogger("Dapr.DurableTask");
}

/// <inheritdoc />
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
/// <summary>
/// Creates call options with appropriate settings for long-running connections.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>CallOptions configured for long-running connections.</returns>
internal CallOptions CreateCallOptions(CancellationToken cancellationToken)
{
await using AsyncDisposable disposable = this.GetCallInvoker(out CallInvoker callInvoker, out string address);
this.logger.StartingTaskHubWorker(address);
await new Processor(this, new(callInvoker)).ExecuteAsync(stoppingToken);
// Create call options with NO deadline to ensure unlimited connection time
// This aligns with our channel settings for long-running connections
var options = new CallOptions(cancellationToken: cancellationToken);

// By not setting a Deadline property, we ensure the connection can
// stay open indefinitely, which matches our channel settings
this.logger.ConfiguringGrpcCallOptions();

return options;
}

#if NET6_0_OR_GREATER
static GrpcChannel GetChannel(string? address)
/// <inheritdoc />
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (string.IsNullOrEmpty(address))
while (!stoppingToken.IsCancellationRequested)
{
address = "http://localhost:4001";
try
{
// Reset reconnect counter when we start a new attempt
if (this.reconnectAttempts > 0)
{
this.logger.StartingReconnectAttempt(this.reconnectAttempts);
}

await using AsyncDisposable disposable =
this.GetCallInvoker(out CallInvoker callInvoker, out string address);
this.logger.StartingTaskHubWorker(address);

var stopwatch = Stopwatch.StartNew();
await new Processor(this, new(callInvoker)).ExecuteAsync(stoppingToken);
stopwatch.Stop();

this.logger.TaskHubWorkerExited(stopwatch.ElapsedMilliseconds);

// If we got here without an exception, break out of the retry loop
break;
}
catch (Exception ex) when (!stoppingToken.IsCancellationRequested)
{
this.reconnectAttempts++;

// Log exception with detailed context
this.logger.TaskHubWorkerError(this.reconnectAttempts, ex.GetType().Name, ex.Message, ex);

// Add a brief delay before retrying to avoid tight CPU-bound loops
await Task.Delay(
TimeSpan.FromSeconds(Math.Min(30, Math.Pow(2, Math.Min(this.reconnectAttempts, 5)))),
stoppingToken);
}
catch (Exception ex)
{
this.logger.UnexpectedError(ex, nameof(GrpcDurableTaskWorker));
throw;
}
}

return GrpcChannel.ForAddress(address);
this.logger.CancellationRequested($"Cancellation handled at {nameof(this.ExecuteAsync)} in {nameof(GrpcDurableTaskWorker)}");
}
#endif

#if NETSTANDARD2_0
static GrpcChannel GetChannel(string? address)
{
if (string.IsNullOrEmpty(address))
{
address = "localhost:4001";
address = "http://localhost:4001";
}

return new(address, ChannelCredentials.Insecure);
// Create and configure the gRPC channel options for long-lived connections
var channelOptions = new GrpcChannelOptions
{
// No message size limit
MaxReceiveMessageSize = null,

// Configure keep-alive settings to maintain long-lived connections
HttpHandler = new SocketsHttpHandler
{
// Enable keep-alive
KeepAlivePingPolicy = HttpKeepAlivePingPolicy.Always,
KeepAlivePingDelay = TimeSpan.FromSeconds(30),
KeepAlivePingTimeout = TimeSpan.FromSeconds(30),

// Pooled connections are reused and won't time out from inactivity
EnableMultipleHttp2Connections = true,

// Set a very long connection lifetime - this allows a controlled connection refresh strategy
PooledConnectionLifetime = TimeSpan.FromDays(1),

// Disable idle timeout entirely
PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
},

DisposeHttpClient = true,
};

return GrpcChannel.ForAddress(address, channelOptions);
}
#endif

AsyncDisposable GetCallInvoker(out CallInvoker callInvoker, out string address)
{
if (this.grpcOptions.Channel is GrpcChannel c)
if (this.grpcOptions.Channel is { } c)
{
this.logger.GrpcChannelTarget(c.Target);
callInvoker = c.CreateCallInvoker();
address = c.Target;
return default;
}

if (this.grpcOptions.CallInvoker is CallInvoker invoker)
if (this.grpcOptions.CallInvoker is { } invoker)
{
this.logger.SelectGrpcCallInvoker();
callInvoker = invoker;
address = "(unspecified)";
return default;
}

this.logger.CreatingGrpcChannelForAddress(this.grpcOptions.Address);
c = GetChannel(this.grpcOptions.Address);
callInvoker = c.CreateCallInvoker();
address = c.Target;
return new AsyncDisposable(() => new(c.ShutdownAsync()));
return new AsyncDisposable(() =>
{
this.logger.ShuttingDownGrpcChannel(c.Target);
return new(c.ShutdownAsync());
});
}
}
30 changes: 0 additions & 30 deletions src/Worker/Grpc/Internal/InternalOptionsExtensions.cs

This file was deleted.

52 changes: 50 additions & 2 deletions src/Worker/Grpc/Logs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ static partial class Logs
[LoggerMessage(EventId = 2, Level = LogLevel.Information, Message = "Durable Task gRPC worker has disconnected from gRPC server.")]
public static partial void SidecarDisconnected(this ILogger logger);

[LoggerMessage(EventId = 3, Level = LogLevel.Information, Message = "The gRPC server for Durable Task gRPC worker is unavailable. Will continue retrying.")]
public static partial void SidecarUnavailable(this ILogger logger);
[LoggerMessage(EventId = 3, Level = LogLevel.Information, Message = "Sidecar unavailable after {connectionDuration}: {status} {statusCode} {message}")]
public static partial void SidecarUnavailableWithDetails(this ILogger logger, string connectionDuration, Status status, StatusCode statusCode, string message);

[LoggerMessage(EventId = 4, Level = LogLevel.Information, Message = "Sidecar work-item streaming connection established.")]
public static partial void EstablishedWorkItemConnection(this ILogger logger);
Expand Down Expand Up @@ -57,5 +57,53 @@ static partial class Logs

[LoggerMessage(EventId = 58, Level = LogLevel.Information, Message = "Abandoning orchestration. InstanceId = '{instanceId}'. Completion token = '{completionToken}'")]
public static partial void AbandoningOrchestrationDueToVersioning(this ILogger logger, string instanceId, string completionToken);

[LoggerMessage(EventId = 59, Level = LogLevel.Debug, Message = "Cancellation requested. Message: '{message}'")]
public static partial void CancellationRequested(this ILogger logger, string message);

[LoggerMessage(EventId = 60, Level = LogLevel.Debug, Message = "Starting reconnection attempt #{attemptCount}")]
public static partial void StartingReconnectAttempt(this ILogger logger, int attemptCount);

[LoggerMessage(EventId = 61, Level = LogLevel.Debug, Message = "Task hub worker exited after {elapsedTimeMs} ms")]
public static partial void TaskHubWorkerExited(this ILogger logger, long elapsedTimeMs);

[LoggerMessage(EventId = 62, Level = LogLevel.Debug, Message = "Error in task hub worker, attempt #{reconnectionAttempts}: {exceptionType}: {exceptionMessage}")]
public static partial void TaskHubWorkerError(this ILogger logger, int reconnectionAttempts, string exceptionType, string exceptionMessage, Exception ex);

[LoggerMessage(EventId = 63, Level = LogLevel.Debug, Message = "Using provided gRPC channel with target '{target}'")]
public static partial void GrpcChannelTarget(this ILogger logger, string target);

[LoggerMessage(EventId = 64, Level = LogLevel.Debug, Message = "Using provided CallInvoker")]
public static partial void SelectGrpcCallInvoker(this ILogger logger);

[LoggerMessage(EventId = 65, Level = LogLevel.Debug, Message = "Creating new gRPC channel for address '{address}'")]
public static partial void CreatingGrpcChannelForAddress(this ILogger logger, string address);

[LoggerMessage(EventId = 66, Level = LogLevel.Debug, Message = "Shutting down gRPC channel for address '{address}'")]
public static partial void ShuttingDownGrpcChannel(this ILogger logger, string address);

[LoggerMessage(EventId = 67, Level = LogLevel.Debug, Message = "Configuring gRPC call with no deadline constraint")]
public static partial void ConfiguringGrpcCallOptions(this ILogger logger);

[LoggerMessage(EventId = 68, Level = LogLevel.Debug, Message = "Opening stream connection to get work items")]
public static partial void OpeningTaskStream(this ILogger logger);

[LoggerMessage(EventId = 69, Level = LogLevel.Debug, Message = "Received work item of type '{workItemType}' at '{lastActivityTimestamp}'")]
public static partial void ReceivedWorkItem(this ILogger logger, string workItemType, DateTime lastActivityTimestamp);

[LoggerMessage(EventId = 70, Level = LogLevel.Debug, Message = "Connection stats: Duration={connectionDuration}, LastActivity={timeSinceLastActivity}, WorkItemsProcessed={workItemsProcessed}")]
public static partial void ConnectionStats(this ILogger logger, string connectionDuration, string timeSinceLastActivity, int workItemsProcessed);

[LoggerMessage(EventId = 71, Level = LogLevel.Warning, Message = "Work item stream ended gracefully after {connectionDuration}. This is unusual but not necessarily an error.")]
public static partial void StreamEndedGracefully(this ILogger logger, string connectionDuration);

[LoggerMessage(EventId = 72, Level = LogLevel.Warning, Message = "gRPC call cancelled after {connectionDuration}: {status} {statusCode} {message}")]
public static partial void GrpcCallCancelled(this ILogger logger, string connectionDuration, Status status, StatusCode statusCode, string message);

[LoggerMessage(EventId = 73, Level = LogLevel.Warning, Message = "Unexpected error in gRPC worker after {connectionDuration}: {exceptionType}: {exceptionMessage}")]
public static partial void GrpcCallUnexpectedError(this ILogger logger, string connectionDuration, string exceptionType, string exceptionMessage, Exception ex);

[LoggerMessage(EventId = 74, Level = LogLevel.Debug, Message = "Waiting {delaySeconds} seconds before reconnection attempt #{reconnectAttempt}")]
public static partial void ReconnectionDelay(this ILogger logger, int delaySeconds, int reconnectAttempt);
}
}
Loading
Loading