Skip to content

Commit d6fda2f

Browse files
Add scheduler batch size option (#1689) (#1704)
1 parent 0c4f155 commit d6fda2f

File tree

8 files changed

+34
-9
lines changed

8 files changed

+34
-9
lines changed

docs/content/user-guide/en/cap/configuration.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,13 @@ If set to true, we will use a database-based distributed lock to solve the probl
146146
147147
The interval of the collector processor deletes expired messages.
148148

149-
#### ConsumerThreadCount
149+
#### SchedulerBatchSize
150+
151+
> Default: 1000
152+
153+
Maximum number of delayed or queued messages fetched per scheduler cycle.
154+
155+
#### ConsumerThreadCount
150156

151157
> Default: 1
152158

docs/content/user-guide/zh/cap/configuration.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,12 @@ Group 在不同的 Broker 有不同的对应项。
152152
153153
收集器删除已经过期消息的时间间隔。
154154

155+
#### SchedulerBatchSize
156+
157+
> 默认值:1000
158+
159+
调度器每次循环获取的延迟或排队消息的最大数量。
160+
155161
#### FailedRetryCount
156162

157163
> 默认值:50

src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ public Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<MediumMessag
228228
var result = PublishedMessages.Values.Where(x =>
229229
(x.StatusName == StatusName.Delayed && x.ExpiresAt < DateTime.Now.AddMinutes(2))
230230
|| (x.StatusName == StatusName.Queued && x.ExpiresAt < DateTime.Now.AddMinutes(-1)))
231+
.Take(_capOptions.Value.SchedulerBatchSize)
231232
.Select(x => (MediumMessage)x);
232233

233234
return scheduleTask(null!, result);

src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,9 @@ public async Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<Medium
336336
await collection.UpdateManyAsync(session, filter, update, cancellationToken: linkedTs.Token)
337337
.ConfigureAwait(false);
338338

339-
var queryResult = await collection.Find(session, filter).ToListAsync(linkedTs.Token)
339+
var queryResult = await collection.Find(session, filter)
340+
.Limit(_capOptions.Value.SchedulerBatchSize)
341+
.ToListAsync(linkedTs.Token)
340342
.ConfigureAwait(false);
341343

342344
var result = queryResult.Select(x => new MediumMessage

src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,13 +263,14 @@ public async Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<Medium
263263

264264
var sql =
265265
$"SELECT `Id`,`Content`,`Retries`,`Added`,`ExpiresAt` FROM `{_pubName}` WHERE `Version`=@Version " +
266-
$"AND ((`ExpiresAt`< @TwoMinutesLater AND `StatusName` = '{StatusName.Delayed}') OR (`ExpiresAt`< @OneMinutesAgo AND `StatusName` = '{StatusName.Queued}')) {lockSql};";
266+
$"AND ((`ExpiresAt`< @TwoMinutesLater AND `StatusName` = '{StatusName.Delayed}') OR (`ExpiresAt`< @OneMinutesAgo AND `StatusName` = '{StatusName.Queued}')) LIMIT @BatchSize {lockSql};";
267267

268268
object[] sqlParams =
269269
{
270270
new MySqlParameter("@Version", _capOptions.Value.Version),
271271
new MySqlParameter("@TwoMinutesLater", DateTime.Now.AddMinutes(2)),
272-
new MySqlParameter("@OneMinutesAgo", DateTime.Now.AddMinutes(-1))
272+
new MySqlParameter("@OneMinutesAgo", DateTime.Now.AddMinutes(-1)),
273+
new MySqlParameter("@BatchSize", _capOptions.Value.SchedulerBatchSize)
273274
};
274275

275276
await using var connection = new MySqlConnection(_options.Value.ConnectionString);

src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,13 +257,14 @@ public async Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<Medium
257257
{
258258
var sql =
259259
$"SELECT \"Id\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\" FROM {_pubName} WHERE \"Version\"=@Version " +
260-
$"AND ((\"ExpiresAt\"< @TwoMinutesLater AND \"StatusName\" = '{StatusName.Delayed}') OR (\"ExpiresAt\"< @OneMinutesAgo AND \"StatusName\" = '{StatusName.Queued}')) FOR UPDATE SKIP LOCKED;";
260+
$"AND ((\"ExpiresAt\"< @TwoMinutesLater AND \"StatusName\" = '{StatusName.Delayed}') OR (\"ExpiresAt\"< @OneMinutesAgo AND \"StatusName\" = '{StatusName.Queued}')) FOR UPDATE SKIP LOCKED LIMIT @BatchSize;";
261261

262262
var sqlParams = new object[]
263263
{
264264
new NpgsqlParameter("@Version", _capOptions.Value.Version),
265265
new NpgsqlParameter("@TwoMinutesLater", DateTime.Now.AddMinutes(2)),
266-
new NpgsqlParameter("@OneMinutesAgo", QueuedMessageFetchTime())
266+
new NpgsqlParameter("@OneMinutesAgo", QueuedMessageFetchTime()),
267+
new NpgsqlParameter("@BatchSize", _capOptions.Value.SchedulerBatchSize)
267268
};
268269

269270
await using var connection = _options.Value.CreateConnection();

src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,17 +255,18 @@ public async Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<Medium
255255
CancellationToken token = default)
256256
{
257257
var sql = $@"
258-
SELECT Id, Content, Retries, Added, ExpiresAt FROM {_pubName} WITH (UPDLOCK, READPAST)
258+
SELECT TOP (@BatchSize) Id, Content, Retries, Added, ExpiresAt FROM {_pubName} WITH (UPDLOCK, READPAST)
259259
WHERE Version = @Version AND StatusName = '{StatusName.Delayed}' AND ExpiresAt < @TwoMinutesLater
260260
UNION ALL
261-
SELECT Id, Content, Retries, Added, ExpiresAt FROM {_pubName} WITH (UPDLOCK, READPAST)
261+
SELECT TOP (@BatchSize) Id, Content, Retries, Added, ExpiresAt FROM {_pubName} WITH (UPDLOCK, READPAST)
262262
WHERE Version = @Version AND StatusName = '{StatusName.Queued}' AND ExpiresAt < @OneMinutesAgo;";
263263

264264
object[] sqlParams =
265265
{
266266
new SqlParameter("@Version", _capOptions.Value.Version),
267267
new SqlParameter("@TwoMinutesLater", DateTime.Now.AddMinutes(2)),
268-
new SqlParameter("@OneMinutesAgo", DateTime.Now.AddMinutes(-1))
268+
new SqlParameter("@OneMinutesAgo", DateTime.Now.AddMinutes(-1)),
269+
new SqlParameter("@BatchSize", _capOptions.Value.SchedulerBatchSize)
269270
};
270271

271272
await using var connection = new SqlConnection(_options.Value.ConnectionString);

src/DotNetCore.CAP/CAP.Options.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public CapOptions()
3535
DefaultGroupName = "cap.queue." + Assembly.GetEntryAssembly()?.GetName().Name!.ToLower();
3636
CollectorCleaningInterval = 300;
3737
FallbackWindowLookbackSeconds = 240;
38+
SchedulerBatchSize = 1000;
3839
}
3940

4041
internal IList<ICapOptionsExtension> Extensions { get; }
@@ -140,6 +141,12 @@ public CapOptions()
140141
/// </summary>
141142
public int CollectorCleaningInterval { get; set; }
142143

144+
/// <summary>
145+
/// Maximum number of delayed or queued messages fetched per scheduler cycle.
146+
/// Default is 1000.
147+
/// </summary>
148+
public int SchedulerBatchSize { get; set; }
149+
143150
/// <summary>
144151
/// Configure JSON serialization settings
145152
/// </summary>

0 commit comments

Comments
 (0)