Skip to content

Commit e919608

Browse files
committed
4472: Split into Compaction and Bulk Export from a users perspective
1 parent a1726f0 commit e919608

File tree

5 files changed

+112
-21
lines changed

5 files changed

+112
-21
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2022-2025 Crown Copyright
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package sleeper.common.task;
17+
18+
/**
19+
* Finds the number of messages on a queue, and starts up one EC2 or Fargate task for each, up to a
20+
* configurable maximum.
21+
*/
22+
public class RunBulkExportTasks {
23+
24+
public static void main(String[] args) {
25+
if (args.length != 1) {
26+
System.out.println("Usage: <instance-id> <number-of-tasks>");
27+
return;
28+
}
29+
RunDataProcessingTasks.run(args[0], Integer.parseInt(args[1]), false);
30+
}
31+
32+
private RunBulkExportTasks() {
33+
}
34+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2022-2025 Crown Copyright
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package sleeper.common.task;
17+
18+
/**
19+
* Finds the number of messages on a queue, and starts up one EC2 or Fargate task for each, up to a
20+
* configurable maximum.
21+
*/
22+
public class RunCompactionTasks {
23+
24+
public static void main(String[] args) {
25+
if (args.length != 1) {
26+
System.out.println("Usage: <instance-id> <number-of-tasks>");
27+
return;
28+
}
29+
RunDataProcessingTasks.run(args[0], Integer.parseInt(args[1]), true);
30+
}
31+
32+
private RunCompactionTasks() {
33+
}
34+
}

java/common/common-task/src/main/java/sleeper/common/task/RunDataProcessingTasks.java

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,7 @@ private RunDataProcessingTasks(Builder builder) {
7373
this.maximumRunningTasks = builder.maximumRunningTasks;
7474
}
7575

76-
public static void main(String[] args) {
77-
if (args.length != 2) {
78-
System.out.println("Usage: <instance-id> <number-of-tasks> <is-compaction-task>");
79-
return;
80-
}
81-
String instanceId = args[0];
82-
int numberOfTasks = Integer.parseInt(args[1]);
83-
boolean compactionTask = Boolean.parseBoolean(args[2]);
76+
public static void run(String instanceId, int numberOfTasks, boolean compactionTask) {
8477
try (S3Client s3Client = S3Client.create();
8578
EcsClient ecsClient = EcsClient.create();
8679
AutoScalingClient asClient = AutoScalingClient.create();
@@ -90,7 +83,7 @@ public static void main(String[] args) {
9083
if (compactionTask) {
9184
createForCompactions(instanceProperties, ecsClient, asClient, ec2Client).runToMeetTargetTasks(numberOfTasks);
9285
} else {
93-
createForBulkExport(instanceProperties, ecsClient, asClient, ec2Client).runToMeetTargetTasks(numberOfTasks);
86+
createForBulkExport(instanceProperties, ecsClient).runToMeetTargetTasks(numberOfTasks);
9487
}
9588
}
9689
}
@@ -99,7 +92,7 @@ public static Builder builder() {
9992
return new Builder();
10093
}
10194

102-
public static RunDataProcessingTasks createForBulkExport(InstanceProperties instanceProperties, EcsClient ecsClient, AutoScalingClient asClient, Ec2Client ec2Client) {
95+
public static RunDataProcessingTasks createForBulkExport(InstanceProperties instanceProperties, EcsClient ecsClient) {
10396
String clusterName = instanceProperties.get(BULK_EXPORT_CLUSTER);
10497
return builder()
10598
.hostScaler(new BulkExportTaskHostScaler())
@@ -113,17 +106,9 @@ public static RunDataProcessingTasks createForBulkExport(InstanceProperties inst
113106
.build();
114107
}
115108

116-
public static RunDataProcessingTasks createForCompactions(InstanceProperties instanceProperties, TaskCounts taskCounts, CompactionTaskHostScaler compactionHostScaler, TaskLauncher taskLauncher) {
117-
return setupForCompactions(instanceProperties)
118-
.hostScaler(compactionHostScaler)
119-
.taskCounts(taskCounts)
120-
.taskLauncher(taskLauncher)
121-
.build();
122-
}
123-
124109
public static RunDataProcessingTasks createForCompactions(InstanceProperties instanceProperties, EcsClient ecsClient, AutoScalingClient asClient, Ec2Client ec2Client) {
125110
String clusterName = instanceProperties.get(COMPACTION_CLUSTER);
126-
return setupForCompactions(instanceProperties)
111+
return builderForCompactions(instanceProperties)
127112
.hostScaler(EC2Scaler.create(instanceProperties, asClient, ec2Client))
128113
.taskCounts(() -> ECSTaskCount.getNumPendingAndRunningTasks(clusterName, ecsClient))
129114
.taskLauncher((numberOfTasks, checkAbort) -> launchTasks(ecsClient, instanceProperties, COMPACTION_CONTAINER_NAME, clusterName,
@@ -132,7 +117,7 @@ public static RunDataProcessingTasks createForCompactions(InstanceProperties ins
132117
.build();
133118
}
134119

135-
private static Builder setupForCompactions(InstanceProperties instanceProperties) {
120+
public static Builder builderForCompactions(InstanceProperties instanceProperties) {
136121
return builder()
137122
.sqsJobQueueUrl(instanceProperties.get(COMPACTION_JOB_QUEUE_URL))
138123
.maximumRunningTasks(instanceProperties.getInt(MAXIMUM_CONCURRENT_COMPACTION_TASKS));

java/common/common-task/src/test/java/sleeper/common/task/RunDataProcessingTasksTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,11 @@ private void runToMeetTargetTasks(int requestedTasks, TaskCounts taskCounts) {
416416
}
417417

418418
private RunDataProcessingTasks taskRunner(TaskCounts taskCounts) {
419-
return RunDataProcessingTasks.createForCompactions(instanceProperties, taskCounts, hostScaler(), taskLauncher());
419+
return RunDataProcessingTasks.builderForCompactions(instanceProperties)
420+
.hostScaler(hostScaler())
421+
.taskCounts(taskCounts)
422+
.taskLauncher(taskLauncher())
423+
.build();
420424
}
421425

422426
private CompactionTaskHostScaler hostScaler() {
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#!/usr/bin/env bash
2+
# Copyright 2022-2025 Crown Copyright
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
set -e
17+
unset CDPATH
18+
19+
if [ "$#" -ne 2 ]; then
20+
echo "Usage: $0 <instance-id> <number-of-tasks>"
21+
exit 1
22+
fi
23+
24+
SCRIPTS_DIR=$(cd "$(dirname "$0")" && cd .. && pwd)
25+
26+
TEMPLATE_DIR=${SCRIPTS_DIR}/templates
27+
JAR_DIR=${SCRIPTS_DIR}/jars
28+
29+
VERSION=$(cat "${TEMPLATE_DIR}/version.txt")
30+
31+
echo "-------------------------------------------------------"
32+
echo "Running bulk export tasks"
33+
echo "-------------------------------------------------------"
34+
java -cp "${JAR_DIR}/clients-${VERSION}-utility.jar" sleeper.common.task.RunBulkExportTasks "$@"

0 commit comments

Comments
 (0)