diff --git a/java/bulk-export/bulk-export-task-creator/src/main/java/sleeper/bulkexport/taskcreator/RunLeafPartitionBulkExportTasks.java b/java/bulk-export/bulk-export-task-creator/src/main/java/sleeper/bulkexport/taskcreator/RunLeafPartitionBulkExportTasks.java
deleted file mode 100644
index 91cee6e570f..00000000000
--- a/java/bulk-export/bulk-export-task-creator/src/main/java/sleeper/bulkexport/taskcreator/RunLeafPartitionBulkExportTasks.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Copyright 2022-2025 Crown Copyright
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package sleeper.bulkexport.taskcreator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.services.ecs.EcsClient;
-import software.amazon.awssdk.services.ecs.model.AwsVpcConfiguration;
-import software.amazon.awssdk.services.ecs.model.ContainerOverride;
-import software.amazon.awssdk.services.ecs.model.LaunchType;
-import software.amazon.awssdk.services.ecs.model.NetworkConfiguration;
-import software.amazon.awssdk.services.ecs.model.PropagateTags;
-import software.amazon.awssdk.services.ecs.model.RunTaskRequest;
-import software.amazon.awssdk.services.ecs.model.TaskOverride;
-
-import sleeper.common.task.ECSTaskCount;
-import sleeper.common.task.QueueMessageCount;
-import sleeper.common.task.RunECSTasks;
-import sleeper.core.properties.instance.InstanceProperties;
-
-import java.util.List;
-import java.util.Objects;
-import java.util.function.BooleanSupplier;
-
-import static sleeper.core.ContainerConstants.BULK_EXPORT_CONTAINER_NAME;
-import static sleeper.core.properties.instance.BulkExportProperty.MAXIMUM_CONCURRENT_BULK_EXPORT_TASKS;
-import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.BULK_EXPORT_CLUSTER;
-import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.BULK_EXPORT_TASK_FARGATE_DEFINITION_FAMILY;
-import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;
-import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.LEAF_PARTITION_BULK_EXPORT_QUEUE_URL;
-import static sleeper.core.properties.instance.CommonProperty.ECS_SECURITY_GROUPS;
-import static sleeper.core.properties.instance.CommonProperty.FARGATE_VERSION;
-import static sleeper.core.properties.instance.CommonProperty.SUBNETS;
-
-/**
- * Finds the number of messages on a queue, and starts up a Fargate
- * task for each, up to a configurable maximum.
- */
-public class RunLeafPartitionBulkExportTasks {
- private static final Logger LOGGER = LoggerFactory.getLogger(RunLeafPartitionBulkExportTasks.class);
-
- private final InstanceProperties instanceProperties;
- private final TaskCounts taskCounts;
- private final TaskLauncher taskLauncher;
-
- public RunLeafPartitionBulkExportTasks(
- InstanceProperties instanceProperties, EcsClient ecsClient) {
- this(instanceProperties,
- () -> ECSTaskCount.getNumPendingAndRunningTasks(instanceProperties.get(BULK_EXPORT_CLUSTER), ecsClient),
- (numberOfTasks, checkAbort) -> launchTasks(ecsClient, instanceProperties, numberOfTasks, checkAbort));
- }
-
- public RunLeafPartitionBulkExportTasks(InstanceProperties instanceProperties, TaskCounts taskCounts,
- TaskLauncher taskLauncher) {
- this.instanceProperties = instanceProperties;
- this.taskCounts = taskCounts;
- this.taskLauncher = taskLauncher;
- }
-
- /**
- * Interface for getting the number of running and pending tasks.
- */
- public interface TaskCounts {
- /**
- * Get the number of running and pending tasks.
- *
- * @return the number of running and pending tasks
- */
- int getRunningAndPending();
- }
-
- /**
- * Interface for launching tasks.
- */
- public interface TaskLauncher {
- /**
- * Launches tasks.
- *
- * @param numberOfTasksToCreate the number of tasks to create
- * @param checkAbort a condition under which launching will be
- * aborted
- */
- void launchTasks(int numberOfTasksToCreate, BooleanSupplier checkAbort);
- }
-
- /**
- * Run the tasks in batches.
- *
- * @param queueMessageCount the queue message count
- */
- public void run(QueueMessageCount.Client queueMessageCount) {
- long startTime = System.currentTimeMillis();
- String sqsJobQueueUrl = instanceProperties.get(LEAF_PARTITION_BULK_EXPORT_QUEUE_URL);
- int maximumRunningTasks = instanceProperties.getInt(MAXIMUM_CONCURRENT_BULK_EXPORT_TASKS);
- LOGGER.info("Queue URL is {}", sqsJobQueueUrl);
- // Find out number of messages in queue that are not being processed
- int queueSize = queueMessageCount.getQueueMessageCount(sqsJobQueueUrl)
- .getApproximateNumberOfMessages();
- LOGGER.info("Queue size is {}", queueSize);
- // Request 1 task for each item on the queue
- LOGGER.info("Maximum concurrent tasks is {}", maximumRunningTasks);
- int numRunningAndPendingTasks = taskCounts.getRunningAndPending();
- LOGGER.info("Number of running and pending tasks is {}", numRunningAndPendingTasks);
-
- int maxTasksToCreate = maximumRunningTasks - numRunningAndPendingTasks;
- int numberOfTasksToCreate = Math.min(queueSize, maxTasksToCreate);
- int targetTasks = numRunningAndPendingTasks + numberOfTasksToCreate;
- scaleToHostsAndLaunchTasks(targetTasks, numberOfTasksToCreate, () -> {
- // This lambda is triggered every minute so abort once get
- // close to 1 minute
- if (System.currentTimeMillis() - startTime > 50 * 1000L) {
- LOGGER.info("Running for more than 50 seconds, aborting");
- return true;
- } else {
- return false;
- }
- });
- }
-
- /**
- * Ensure that a given number of bulk export tasks are running.
- *
- * If there are currently less tasks running, then new ones will be started.
- * The scaler will be invoked to ensure the ECS cluster can accommodate the
- * necessary new tasks
- * (may be a delay in launching instances, so initial task launch may fail).
- *
- * @param desiredTaskCount the number of tasks to ensure are running
- */
- public void runToMeetTargetTasks(int desiredTaskCount) {
- int numRunningAndPendingTasks = taskCounts.getRunningAndPending();
- LOGGER.info("Number of running and pending tasks is {}", numRunningAndPendingTasks);
- int numberOfTasksToCreate = desiredTaskCount - numRunningAndPendingTasks;
- // Instruct the scaler to ensure we have room for the correct number of running
- // tasks. We use max() here
- // because we are either at or need to grow to the desired number. If the
- // desiredTaskCount is LESS than
- // the number of running/pending tasks, we don't want to instruct the scaler to
- // shrink the ECS cluster.
- scaleToHostsAndLaunchTasks(Math.max(desiredTaskCount, numRunningAndPendingTasks), numberOfTasksToCreate,
- () -> false);
- }
-
- private void scaleToHostsAndLaunchTasks(int targetTasks, int createTasks, BooleanSupplier checkAbort) {
- LOGGER.info("Target number of tasks is {}", targetTasks);
- LOGGER.info("Tasks to create is {}", createTasks);
-
- if (targetTasks < 0) {
- throw new IllegalArgumentException("targetTasks is < 0");
- }
-
- if (createTasks < 1) {
- LOGGER.info("Finishing as no new tasks are needed");
- return;
- }
- taskLauncher.launchTasks(createTasks, checkAbort);
- }
-
- /**
- * Attempts to launch some tasks on ECS.
- *
- * @param ecsClient Amazon ECS client
- * @param instanceProperties Properties for instance
- * @param numberOfTasksToCreate number of tasks to create
- * @param checkAbort a condition under which launching will be
- * aborted
- */
- private static void launchTasks(
- EcsClient ecsClient, InstanceProperties instanceProperties,
- int numberOfTasksToCreate, BooleanSupplier checkAbort) {
- RunECSTasks.runTasks(builder -> builder
- .ecsClient(ecsClient)
- .runTaskRequest(createRunTaskRequest(instanceProperties))
- .numberOfTasksToCreate(numberOfTasksToCreate)
- .checkAbort(checkAbort));
- }
-
- /**
- * Creates a new task request that can be passed to ECS.
- *
- * @param instanceProperties the instance properties
- * @return the request for ECS
- * @throws IllegalArgumentException if launchType is FARGATE and
- * version is null
- */
- private static RunTaskRequest createRunTaskRequest(InstanceProperties instanceProperties) {
- TaskOverride override = createOverride(instanceProperties);
- RunTaskRequest.Builder runTaskRequest = RunTaskRequest.builder()
- .cluster(instanceProperties.get(BULK_EXPORT_CLUSTER))
- .overrides(override)
- .propagateTags(PropagateTags.TASK_DEFINITION);
-
- String fargateVersion = Objects.requireNonNull(instanceProperties.get(FARGATE_VERSION),
- "fargateVersion cannot be null");
- NetworkConfiguration networkConfiguration = networkConfig(instanceProperties);
- runTaskRequest.launchType(LaunchType.FARGATE)
- .platformVersion(fargateVersion)
- .networkConfiguration(networkConfiguration)
- .taskDefinition(instanceProperties.get(BULK_EXPORT_TASK_FARGATE_DEFINITION_FAMILY));
-
- return runTaskRequest.build();
- }
-
- /**
- * Create the container definition overrides for the task launch.
- *
- * @param instanceProperties the instance properties
- * @return the container definition overrides
- */
- private static TaskOverride createOverride(InstanceProperties instanceProperties) {
- ContainerOverride containerOverride = ContainerOverride.builder()
- .name(BULK_EXPORT_CONTAINER_NAME)
- .command(List.of(instanceProperties.get(CONFIG_BUCKET)))
- .build();
- return TaskOverride.builder()
- .containerOverrides(containerOverride)
- .build();
- }
-
- /**
- * Create the container networking configuration.
- *
- * @param instanceProperties the instance properties
- * @return task network configuration
- */
- private static NetworkConfiguration networkConfig(InstanceProperties instanceProperties) {
- AwsVpcConfiguration vpcConfiguration = AwsVpcConfiguration.builder()
- .subnets(instanceProperties.getList(SUBNETS))
- .securityGroups(instanceProperties.getList(ECS_SECURITY_GROUPS))
- .build();
- return NetworkConfiguration.builder()
- .awsvpcConfiguration(vpcConfiguration)
- .build();
- }
-}
diff --git a/java/bulk-export/bulk-export-task-creator/src/main/java/sleeper/bulkexport/taskcreator/SqsTriggeredBulkExportTaskRunnerLambda.java b/java/bulk-export/bulk-export-task-creator/src/main/java/sleeper/bulkexport/taskcreator/SqsTriggeredBulkExportTaskRunnerLambda.java
index 323eab4ebda..14d4fbc6ada 100644
--- a/java/bulk-export/bulk-export-task-creator/src/main/java/sleeper/bulkexport/taskcreator/SqsTriggeredBulkExportTaskRunnerLambda.java
+++ b/java/bulk-export/bulk-export-task-creator/src/main/java/sleeper/bulkexport/taskcreator/SqsTriggeredBulkExportTaskRunnerLambda.java
@@ -24,6 +24,7 @@
import software.amazon.awssdk.services.sqs.SqsClient;
import sleeper.common.task.QueueMessageCount;
+import sleeper.common.task.RunDataProcessingTasks;
import sleeper.configuration.properties.S3InstanceProperties;
import sleeper.core.properties.instance.InstanceProperties;
@@ -38,7 +39,7 @@ public class SqsTriggeredBulkExportTaskRunnerLambda {
private static final Logger LOGGER = LoggerFactory.getLogger(SqsTriggeredBulkExportTaskRunnerLambda.class);
private final QueueMessageCount.Client queueMessageCount;
- private final RunLeafPartitionBulkExportTasks runLeafPartitionBulkExportTasks;
+ private final RunDataProcessingTasks runTasks;
public SqsTriggeredBulkExportTaskRunnerLambda() {
String s3Bucket = validateParameter(CONFIG_BUCKET.toEnvironmentVariable());
@@ -46,8 +47,9 @@ public SqsTriggeredBulkExportTaskRunnerLambda() {
S3Client s3Client = S3Client.create();
EcsClient ecsClient = EcsClient.create();
InstanceProperties instanceProperties = S3InstanceProperties.loadFromBucket(s3Client, s3Bucket);
- runLeafPartitionBulkExportTasks = new RunLeafPartitionBulkExportTasks(instanceProperties, ecsClient);
- queueMessageCount = QueueMessageCount.withSqsClient(sqsClient);
+
+ this.runTasks = RunDataProcessingTasks.createForBulkExport(instanceProperties, ecsClient);
+ this.queueMessageCount = QueueMessageCount.withSqsClient(sqsClient);
}
/**
@@ -58,7 +60,7 @@ public SqsTriggeredBulkExportTaskRunnerLambda() {
* @param context the lambda context
*/
public void handleRequest(SQSEvent input, Context context) {
- runLeafPartitionBulkExportTasks.run(queueMessageCount);
+ runTasks.run(queueMessageCount);
}
private static String validateParameter(String parameterName) {
diff --git a/java/common/common-task/src/main/java/sleeper/common/task/BulkExportTaskHostScaler.java b/java/common/common-task/src/main/java/sleeper/common/task/BulkExportTaskHostScaler.java
new file mode 100644
index 00000000000..c7dd60f3481
--- /dev/null
+++ b/java/common/common-task/src/main/java/sleeper/common/task/BulkExportTaskHostScaler.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2022-2025 Crown Copyright
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package sleeper.common.task;
+
+public class BulkExportTaskHostScaler implements TaskHostScaler {
+
+ @Override
+ public void scaleTo(int numberOfTasks) {
+ // Bulk Export doesn't currently scale so do nothing
+ }
+
+}
diff --git a/java/common/common-task/src/main/java/sleeper/common/task/CompactionTaskHostScaler.java b/java/common/common-task/src/main/java/sleeper/common/task/CompactionTaskHostScaler.java
index 51edaf3eeab..41d91eab409 100644
--- a/java/common/common-task/src/main/java/sleeper/common/task/CompactionTaskHostScaler.java
+++ b/java/common/common-task/src/main/java/sleeper/common/task/CompactionTaskHostScaler.java
@@ -37,7 +37,7 @@
* Autoscaler to scale EC2 instances for the desired number of compaction tasks. This makes decisions on how many
* instances to start and stop based on the amount of work there is to do.
*/
-public class CompactionTaskHostScaler {
+public class CompactionTaskHostScaler implements TaskHostScaler {
private final InstanceProperties instanceProperties;
private final CheckAutoScalingGroup asgQuery;
@@ -63,6 +63,7 @@ public CompactionTaskHostScaler(
*
* @param numberOfTasks total number of tasks to provide capacity for
*/
+ @Override
public void scaleTo(int numberOfTasks) {
String launchType = instanceProperties.get(COMPACTION_ECS_LAUNCHTYPE);
// Only need scaler for EC2
diff --git a/java/common/common-task/src/main/java/sleeper/common/task/RunBulkExportTasks.java b/java/common/common-task/src/main/java/sleeper/common/task/RunBulkExportTasks.java
new file mode 100644
index 00000000000..d3dcafb6f57
--- /dev/null
+++ b/java/common/common-task/src/main/java/sleeper/common/task/RunBulkExportTasks.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2022-2025 Crown Copyright
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package sleeper.common.task;
+
+import software.amazon.awssdk.services.ecs.EcsClient;
+import software.amazon.awssdk.services.s3.S3Client;
+
+import sleeper.configuration.properties.S3InstanceProperties;
+import sleeper.core.properties.instance.InstanceProperties;
+
+/**
+ * Finds the number of messages on a queue, and starts up one EC2 or Fargate task for each, up to a
+ * configurable maximum.
+ */
+public class RunBulkExportTasks {
+
+ public static void main(String[] args) {
+ if (args.length != 2) {
+ System.out.println("Usage: ");
+ return;
+ }
+
+ try (S3Client s3Client = S3Client.create();
+ EcsClient ecsClient = EcsClient.create()) {
+ String instanceId = args[0];
+ int numberOfTasks = Integer.parseInt(args[1]);
+
+ InstanceProperties instanceProperties = S3InstanceProperties.loadGivenInstanceId(s3Client, instanceId);
+
+ RunDataProcessingTasks.createForBulkExport(instanceProperties, ecsClient)
+ .runToMeetTargetTasks(numberOfTasks);
+ }
+ }
+
+ private RunBulkExportTasks() {
+ }
+}
diff --git a/java/common/common-task/src/main/java/sleeper/common/task/RunCompactionTasks.java b/java/common/common-task/src/main/java/sleeper/common/task/RunCompactionTasks.java
index a144f5d7ec0..894500aa493 100644
--- a/java/common/common-task/src/main/java/sleeper/common/task/RunCompactionTasks.java
+++ b/java/common/common-task/src/main/java/sleeper/common/task/RunCompactionTasks.java
@@ -15,236 +15,40 @@
*/
package sleeper.common.task;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.autoscaling.AutoScalingClient;
import software.amazon.awssdk.services.ec2.Ec2Client;
import software.amazon.awssdk.services.ecs.EcsClient;
-import software.amazon.awssdk.services.ecs.model.AwsVpcConfiguration;
-import software.amazon.awssdk.services.ecs.model.ContainerOverride;
-import software.amazon.awssdk.services.ecs.model.LaunchType;
-import software.amazon.awssdk.services.ecs.model.NetworkConfiguration;
-import software.amazon.awssdk.services.ecs.model.PropagateTags;
-import software.amazon.awssdk.services.ecs.model.RunTaskRequest;
-import software.amazon.awssdk.services.ecs.model.TaskOverride;
import software.amazon.awssdk.services.s3.S3Client;
import sleeper.configuration.properties.S3InstanceProperties;
import sleeper.core.properties.instance.InstanceProperties;
-import sleeper.core.properties.model.CompactionECSLaunchType;
-
-import java.util.List;
-import java.util.Objects;
-import java.util.function.BooleanSupplier;
-
-import static sleeper.core.ContainerConstants.COMPACTION_CONTAINER_NAME;
-import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_CLUSTER;
-import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL;
-import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_TASK_EC2_DEFINITION_FAMILY;
-import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_TASK_FARGATE_DEFINITION_FAMILY;
-import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;
-import static sleeper.core.properties.instance.CommonProperty.ECS_SECURITY_GROUPS;
-import static sleeper.core.properties.instance.CommonProperty.FARGATE_VERSION;
-import static sleeper.core.properties.instance.CommonProperty.SUBNETS;
-import static sleeper.core.properties.instance.CompactionProperty.COMPACTION_ECS_LAUNCHTYPE;
-import static sleeper.core.properties.instance.CompactionProperty.MAXIMUM_CONCURRENT_COMPACTION_TASKS;
/**
* Finds the number of messages on a queue, and starts up one EC2 or Fargate task for each, up to a
* configurable maximum.
*/
public class RunCompactionTasks {
- private static final Logger LOGGER = LoggerFactory.getLogger(RunCompactionTasks.class);
-
- private final InstanceProperties instanceProperties;
- private final TaskCounts taskCounts;
- private final CompactionTaskHostScaler hostScaler;
- private final TaskLauncher taskLauncher;
-
- public RunCompactionTasks(
- InstanceProperties instanceProperties, EcsClient ecsClient, AutoScalingClient asClient, Ec2Client ec2Client) {
- this(instanceProperties,
- () -> ECSTaskCount.getNumPendingAndRunningTasks(instanceProperties.get(COMPACTION_CLUSTER), ecsClient),
- EC2Scaler.create(instanceProperties, asClient, ec2Client),
- (numberOfTasks, checkAbort) -> launchTasks(ecsClient, instanceProperties, numberOfTasks, checkAbort));
- }
-
- public RunCompactionTasks(
- InstanceProperties instanceProperties, TaskCounts taskCounts, CompactionTaskHostScaler hostScaler, TaskLauncher taskLauncher) {
- this.instanceProperties = instanceProperties;
- this.taskCounts = taskCounts;
- this.hostScaler = hostScaler;
- this.taskLauncher = taskLauncher;
- }
-
- public interface TaskCounts {
- int getRunningAndPending();
- }
-
- public interface TaskLauncher {
- void launchTasks(int numberOfTasksToCreate, BooleanSupplier checkAbort);
- }
-
- public void run(QueueMessageCount.Client queueMessageCount) {
- long startTime = System.currentTimeMillis();
- String sqsJobQueueUrl = instanceProperties.get(COMPACTION_JOB_QUEUE_URL);
- int maximumRunningTasks = instanceProperties.getInt(MAXIMUM_CONCURRENT_COMPACTION_TASKS);
- LOGGER.info("Queue URL is {}", sqsJobQueueUrl);
- // Find out number of messages in queue that are not being processed
- int queueSize = queueMessageCount.getQueueMessageCount(sqsJobQueueUrl)
- .getApproximateNumberOfMessages();
- LOGGER.info("Queue size is {}", queueSize);
- // Request 1 task for each item on the queue
- LOGGER.info("Maximum concurrent tasks is {}", maximumRunningTasks);
- int numRunningAndPendingTasks = taskCounts.getRunningAndPending();
- LOGGER.info("Number of running and pending tasks is {}", numRunningAndPendingTasks);
-
- int maxTasksToCreate = maximumRunningTasks - numRunningAndPendingTasks;
- int numberOfTasksToCreate = Math.min(queueSize, maxTasksToCreate);
- int targetTasks = numRunningAndPendingTasks + numberOfTasksToCreate;
- scaleToHostsAndLaunchTasks(targetTasks, numberOfTasksToCreate,
- () -> {
- // This lambda is triggered every minute so abort once get
- // close to 1 minute
- if (System.currentTimeMillis() - startTime > 50 * 1000L) {
- LOGGER.info("Running for more than 50 seconds, aborting");
- return true;
- } else {
- return false;
- }
- });
- }
-
- /**
- * Ensure that a given number of compaction tasks are running.
- *
- * If there are currently less tasks running, then new ones will be started.
- * The scaler will be invoked to ensure the ECS cluster can accommodate the necessary new tasks
- * (may be a delay in launching instances, so initial task launch may fail).
- *
- * @param desiredTaskCount the number of tasks to ensure are running
- */
- public void runToMeetTargetTasks(int desiredTaskCount) {
- int numRunningAndPendingTasks = taskCounts.getRunningAndPending();
- LOGGER.info("Number of running and pending tasks is {}", numRunningAndPendingTasks);
- int numberOfTasksToCreate = desiredTaskCount - numRunningAndPendingTasks;
- // Instruct the scaler to ensure we have room for the correct number of running tasks. We use max() here
- // because we are either at or need to grow to the desired number. If the desiredTaskCount is LESS than
- // the number of running/pending tasks, we don't want to instruct the scaler to shrink the ECS cluster.
- scaleToHostsAndLaunchTasks(Math.max(desiredTaskCount, numRunningAndPendingTasks), numberOfTasksToCreate, () -> false);
- }
-
- private void scaleToHostsAndLaunchTasks(int targetTasks, int createTasks, BooleanSupplier checkAbort) {
- LOGGER.info("Target number of tasks is {}", targetTasks);
- LOGGER.info("Tasks to create is {}", createTasks);
- if (targetTasks < 0) {
- throw new IllegalArgumentException("targetTasks is < 0");
- }
- hostScaler.scaleTo(targetTasks);
- if (createTasks < 1) {
- LOGGER.info("Finishing as no new tasks are needed");
- return;
- }
- taskLauncher.launchTasks(createTasks, checkAbort);
- }
-
- /**
- * Attempts to launch some tasks on ECS.
- *
- * @param ecsClient Amazon ECS client
- * @param instanceProperties Properties for instance
- * @param numberOfTasksToCreate number of tasks to create
- * @param checkAbort a condition under which launching will be aborted
- */
- private static void launchTasks(
- EcsClient ecsClient, InstanceProperties instanceProperties,
- int numberOfTasksToCreate, BooleanSupplier checkAbort) {
- RunECSTasks.runTasks(builder -> builder
- .ecsClient(ecsClient)
- .runTaskRequest(createRunTaskRequest(instanceProperties))
- .numberOfTasksToCreate(numberOfTasksToCreate)
- .checkAbort(checkAbort));
- }
-
- /**
- * Creates a new task request that can be passed to ECS.
- *
- * @param instanceProperties the instance properties
- * @return the request for ECS
- * @throws IllegalArgumentException if launchType is FARGATE and version is null
- */
- private static RunTaskRequest createRunTaskRequest(InstanceProperties instanceProperties) {
- TaskOverride override = createOverride(instanceProperties);
- RunTaskRequest.Builder runTaskRequest = RunTaskRequest.builder()
- .cluster(instanceProperties.get(COMPACTION_CLUSTER))
- .overrides(override)
- .propagateTags(PropagateTags.TASK_DEFINITION);
-
- CompactionECSLaunchType launchType = instanceProperties.getEnumValue(COMPACTION_ECS_LAUNCHTYPE, CompactionECSLaunchType.class);
- if (launchType == CompactionECSLaunchType.FARGATE) {
- String fargateVersion = Objects.requireNonNull(instanceProperties.get(FARGATE_VERSION), "fargateVersion cannot be null");
- NetworkConfiguration networkConfiguration = networkConfig(instanceProperties);
- runTaskRequest
- .launchType(LaunchType.FARGATE)
- .platformVersion(fargateVersion)
- .networkConfiguration(networkConfiguration)
- .taskDefinition(instanceProperties.get(COMPACTION_TASK_FARGATE_DEFINITION_FAMILY));
- } else if (launchType == CompactionECSLaunchType.EC2) {
- runTaskRequest
- .launchType(LaunchType.EC2)
- .taskDefinition(instanceProperties.get(COMPACTION_TASK_EC2_DEFINITION_FAMILY));
- } else {
- throw new IllegalArgumentException("Unrecognised ECS launch type: " + launchType);
- }
- return runTaskRequest.build();
- }
-
- /**
- * Create the container definition overrides for the task launch.
- *
- * @param instanceProperties the instance properties
- * @return the container definition overrides
- */
- private static TaskOverride createOverride(InstanceProperties instanceProperties) {
- ContainerOverride containerOverride = ContainerOverride.builder()
- .name(COMPACTION_CONTAINER_NAME)
- .command(List.of(instanceProperties.get(CONFIG_BUCKET)))
- .build();
- return TaskOverride.builder()
- .containerOverrides(containerOverride)
- .build();
- }
-
- /**
- * Create the container networking configuration.
- *
- * @param instanceProperties the instance properties
- * @return task network configuration
- */
- private static NetworkConfiguration networkConfig(InstanceProperties instanceProperties) {
- AwsVpcConfiguration vpcConfiguration = AwsVpcConfiguration.builder()
- .subnets(instanceProperties.getList(SUBNETS))
- .securityGroups(instanceProperties.getList(ECS_SECURITY_GROUPS))
- .build();
- return NetworkConfiguration.builder()
- .awsvpcConfiguration(vpcConfiguration)
- .build();
- }
public static void main(String[] args) {
if (args.length != 2) {
System.out.println("Usage: ");
return;
}
- String instanceId = args[0];
- int numberOfTasks = Integer.parseInt(args[1]);
+
try (S3Client s3Client = S3Client.create();
EcsClient ecsClient = EcsClient.create();
AutoScalingClient asClient = AutoScalingClient.create();
Ec2Client ec2Client = Ec2Client.create()) {
+ String instanceId = args[0];
+ int numberOfTasks = Integer.parseInt(args[1]);
+
InstanceProperties instanceProperties = S3InstanceProperties.loadGivenInstanceId(s3Client, instanceId);
- new RunCompactionTasks(instanceProperties, ecsClient, asClient, ec2Client)
+
+ RunDataProcessingTasks.createForCompactions(instanceProperties, ecsClient, asClient, ec2Client)
.runToMeetTargetTasks(numberOfTasks);
}
}
+
+ private RunCompactionTasks() {
+ }
}
diff --git a/java/common/common-task/src/main/java/sleeper/common/task/RunDataProcessingTasks.java b/java/common/common-task/src/main/java/sleeper/common/task/RunDataProcessingTasks.java
new file mode 100644
index 00000000000..a5f3584b52e
--- /dev/null
+++ b/java/common/common-task/src/main/java/sleeper/common/task/RunDataProcessingTasks.java
@@ -0,0 +1,387 @@
+/*
+ * Copyright 2022-2025 Crown Copyright
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package sleeper.common.task;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.autoscaling.AutoScalingClient;
+import software.amazon.awssdk.services.ec2.Ec2Client;
+import software.amazon.awssdk.services.ecs.EcsClient;
+import software.amazon.awssdk.services.ecs.model.AwsVpcConfiguration;
+import software.amazon.awssdk.services.ecs.model.ContainerOverride;
+import software.amazon.awssdk.services.ecs.model.LaunchType;
+import software.amazon.awssdk.services.ecs.model.NetworkConfiguration;
+import software.amazon.awssdk.services.ecs.model.PropagateTags;
+import software.amazon.awssdk.services.ecs.model.RunTaskRequest;
+import software.amazon.awssdk.services.ecs.model.TaskOverride;
+
+import sleeper.core.properties.instance.InstanceProperties;
+import sleeper.core.properties.model.CompactionECSLaunchType;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+
+import static sleeper.core.ContainerConstants.BULK_EXPORT_CONTAINER_NAME;
+import static sleeper.core.ContainerConstants.COMPACTION_CONTAINER_NAME;
+import static sleeper.core.properties.instance.BulkExportProperty.MAXIMUM_CONCURRENT_BULK_EXPORT_TASKS;
+import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.BULK_EXPORT_CLUSTER;
+import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.BULK_EXPORT_TASK_FARGATE_DEFINITION_FAMILY;
+import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_CLUSTER;
+import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL;
+import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_TASK_EC2_DEFINITION_FAMILY;
+import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_TASK_FARGATE_DEFINITION_FAMILY;
+import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;
+import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.LEAF_PARTITION_BULK_EXPORT_QUEUE_URL;
+import static sleeper.core.properties.instance.CommonProperty.ECS_SECURITY_GROUPS;
+import static sleeper.core.properties.instance.CommonProperty.FARGATE_VERSION;
+import static sleeper.core.properties.instance.CommonProperty.SUBNETS;
+import static sleeper.core.properties.instance.CompactionProperty.COMPACTION_ECS_LAUNCHTYPE;
+import static sleeper.core.properties.instance.CompactionProperty.MAXIMUM_CONCURRENT_COMPACTION_TASKS;
+
+/**
+ * Finds the number of messages on a queue, and starts up one EC2 or Fargate task for each, up to a
+ * configurable maximum.
+ */
+public class RunDataProcessingTasks {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RunDataProcessingTasks.class);
+ private final TaskHostScaler hostScaler;
+ private final String sqsJobQueueUrl;
+ private final TaskCounts taskCounts;
+ private final TaskLauncher taskLauncher;
+ private final int maximumRunningTasks;
+
+ private RunDataProcessingTasks(Builder builder) {
+ this.hostScaler = builder.hostScaler;
+ this.sqsJobQueueUrl = builder.sqsJobQueueUrl;
+ this.taskCounts = builder.taskCounts;
+ this.taskLauncher = builder.taskLauncher;
+ this.maximumRunningTasks = builder.maximumRunningTasks;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static RunDataProcessingTasks createForBulkExport(InstanceProperties instanceProperties, EcsClient ecsClient) {
+ String clusterName = instanceProperties.get(BULK_EXPORT_CLUSTER);
+ return builder()
+ .hostScaler(new BulkExportTaskHostScaler())
+ .sqsJobQueueUrl(instanceProperties.get(LEAF_PARTITION_BULK_EXPORT_QUEUE_URL))
+ .taskCounts(() -> ECSTaskCount.getNumPendingAndRunningTasks(clusterName, ecsClient))
+ .taskLauncher((numberOfTasks, checkAbort) -> launchTasksForBulkExport(ecsClient, instanceProperties, BULK_EXPORT_CONTAINER_NAME,
+ clusterName, LaunchType.FARGATE,
+ instanceProperties.get(BULK_EXPORT_TASK_FARGATE_DEFINITION_FAMILY),
+ numberOfTasks, checkAbort))
+ .maximumRunningTasks(instanceProperties.getInt(MAXIMUM_CONCURRENT_BULK_EXPORT_TASKS))
+ .build();
+ }
+
+ public static RunDataProcessingTasks createForCompactions(InstanceProperties instanceProperties, EcsClient ecsClient, AutoScalingClient asClient, Ec2Client ec2Client) {
+ String clusterName = instanceProperties.get(COMPACTION_CLUSTER);
+ return builderForCompactions(instanceProperties)
+ .hostScaler(EC2Scaler.create(instanceProperties, asClient, ec2Client))
+ .taskCounts(() -> ECSTaskCount.getNumPendingAndRunningTasks(clusterName, ecsClient))
+ .taskLauncher((numberOfTasks, checkAbort) -> launchTasksForCompaction(ecsClient, instanceProperties, COMPACTION_CONTAINER_NAME, clusterName,
+ instanceProperties.getEnumValue(COMPACTION_ECS_LAUNCHTYPE, CompactionECSLaunchType.class),
+ instanceProperties.get(COMPACTION_TASK_FARGATE_DEFINITION_FAMILY), numberOfTasks, checkAbort))
+ .build();
+ }
+
+ public static Builder builderForCompactions(InstanceProperties instanceProperties) {
+ return builder()
+ .sqsJobQueueUrl(instanceProperties.get(COMPACTION_JOB_QUEUE_URL))
+ .maximumRunningTasks(instanceProperties.getInt(MAXIMUM_CONCURRENT_COMPACTION_TASKS));
+ }
+
+ public interface TaskCounts {
+ int getRunningAndPending();
+ }
+
+ public interface TaskLauncher {
+ void launchTasks(int numberOfTasksToCreate, BooleanSupplier checkAbort);
+ }
+
+ public void run(QueueMessageCount.Client queueMessageCount) {
+ long startTime = System.currentTimeMillis();
+ LOGGER.info("Queue URL is {}", sqsJobQueueUrl);
+ // Find out number of messages in queue that are not being processed
+ int queueSize = queueMessageCount.getQueueMessageCount(sqsJobQueueUrl)
+ .getApproximateNumberOfMessages();
+ LOGGER.info("Queue size is {}", queueSize);
+ // Request 1 task for each item on the queue
+ LOGGER.info("Maximum concurrent tasks is {}", maximumRunningTasks);
+ int numRunningAndPendingTasks = taskCounts.getRunningAndPending();
+ LOGGER.info("Number of running and pending tasks is {}", numRunningAndPendingTasks);
+
+ int maxTasksToCreate = maximumRunningTasks - numRunningAndPendingTasks;
+ int numberOfTasksToCreate = Math.min(queueSize, maxTasksToCreate);
+ int targetTasks = numRunningAndPendingTasks + numberOfTasksToCreate;
+ scaleToHostsAndLaunchTasks(targetTasks, numberOfTasksToCreate,
+ () -> {
+ // This lambda is triggered every minute so abort once get
+ // close to 1 minute
+ if (System.currentTimeMillis() - startTime > 50 * 1000L) {
+ LOGGER.info("Running for more than 50 seconds, aborting");
+ return true;
+ } else {
+ return false;
+ }
+ });
+ }
+
+ /**
+ * Ensure that a given number of compaction tasks are running.
+ *
+ * If there are currently less tasks running, then new ones will be started.
+ * The scaler will be invoked to ensure the ECS cluster can accommodate the necessary new tasks
+ * (may be a delay in launching instances, so initial task launch may fail).
+ *
+ * @param desiredTaskCount the number of tasks to ensure are running
+ */
+ public void runToMeetTargetTasks(int desiredTaskCount) {
+ int numRunningAndPendingTasks = taskCounts.getRunningAndPending();
+ LOGGER.info("Number of running and pending tasks is {}", numRunningAndPendingTasks);
+ int numberOfTasksToCreate = desiredTaskCount - numRunningAndPendingTasks;
+ // Instruct the scaler to ensure we have room for the correct number of running tasks. We use max() here
+ // because we are either at or need to grow to the desired number. If the desiredTaskCount is LESS than
+ // the number of running/pending tasks, we don't want to instruct the scaler to shrink the ECS cluster.
+ scaleToHostsAndLaunchTasks(Math.max(desiredTaskCount, numRunningAndPendingTasks), numberOfTasksToCreate, () -> false);
+ }
+
+ private void scaleToHostsAndLaunchTasks(int targetTasks, int createTasks, BooleanSupplier checkAbort) {
+ LOGGER.info("Target number of tasks is {}", targetTasks);
+ LOGGER.info("Tasks to create is {}", createTasks);
+ if (targetTasks < 0) {
+ throw new IllegalArgumentException("targetTasks is < 0");
+ }
+ hostScaler.scaleTo(targetTasks);
+ if (createTasks < 1) {
+ LOGGER.info("Finishing as no new tasks are needed");
+ return;
+ }
+ taskLauncher.launchTasks(createTasks, checkAbort);
+ }
+
+ /**
+ * Attempts to launch some tasks on ECS.
+ *
+ * @param ecsClient the Amazon ECS client
+ * @param instanceProperties the properties for instance
+ * @param containerName the name of the container
+ * @param clusterName the name for the cluster
+ * @param launchType the launch type either FARGATE or EC2
+ * @param fargateDefinitionFamily the fargate definition family
+ * @param numberOfTasksToCreate the number of tasks to create
+ * @param checkAbort a condition under which launching will be aborted
+ */
+ private static void launchTasksForCompaction(
+ EcsClient ecsClient, InstanceProperties instanceProperties, String containerName, String clusterName,
+ CompactionECSLaunchType launchType, String fargateDefinitionFamily, int numberOfTasksToCreate, BooleanSupplier checkAbort) {
+ builderForRunECSTasks(ecsClient, numberOfTasksToCreate, checkAbort)
+ .runTaskRequest(createCompactionTaskRequest(instanceProperties, containerName, clusterName, launchType, fargateDefinitionFamily))
+ .build().runTasks();
+ }
+
+ /**
+ * Attempts to launch some tasks on ECS.
+ *
+ * @param ecsClient the Amazon ECS client
+ * @param instanceProperties the properties for instance
+ * @param containerName the name of the container
+ * @param clusterName the name for the cluster
+ * @param launchType the launch type either FARGATE or EC2
+ * @param fargateDefinitionFamily the fargate definition family
+ * @param numberOfTasksToCreate the number of tasks to create
+ * @param checkAbort a condition under which launching will be aborted
+ */
+ private static void launchTasksForBulkExport(
+ EcsClient ecsClient, InstanceProperties instanceProperties, String containerName, String clusterName,
+ LaunchType launchType, String fargateDefinitionFamily, int numberOfTasksToCreate, BooleanSupplier checkAbort) {
+ builderForRunECSTasks(ecsClient, numberOfTasksToCreate, checkAbort)
+ .runTaskRequest(createBulkExportTaskRequest(instanceProperties, containerName, clusterName, fargateDefinitionFamily))
+ .build().runTasks();
+ }
+
+ /**
+ * Sets up the shared values in the run ECS tasks builder.
+ *
+ * @param ecsClient the Amazon ECS client
+ * @param numberOfTasksToCreate the number of tasks to create
+ * @param checkAbort a condition under which launching will be aborted
+ * @return the builder to have it's run task request added
+ */
+ private static RunECSTasks.Builder builderForRunECSTasks(EcsClient ecsClient, int numberOfTasksToCreate, BooleanSupplier checkAbort) {
+ return RunECSTasks.builder()
+ .ecsClient(ecsClient)
+ .numberOfTasksToCreate(numberOfTasksToCreate)
+ .checkAbort(checkAbort);
+ }
+
+ /**
+ * Creates a new compaction task request that can be passed to ECS.
+ *
+ * @param instanceProperties the instance properties
+ * @param containerName the name of the container
+ * @param clusterName the name for the cluster
+ * @param launchType the launch type either FARGATE or EC2
+ * @param fargateDefinitionFamily the fargate definition family
+ * @return the request for ECS
+ * @throws IllegalArgumentException if launchType is FARGATE and version is null
+ */
+ private static RunTaskRequest createCompactionTaskRequest(InstanceProperties instanceProperties, String containerName, String clusterName,
+ CompactionECSLaunchType launchType, String fargateDefinitionFamily) {
+
+ RunTaskRequest runTaskRequest;
+
+ if (launchType == CompactionECSLaunchType.FARGATE) {
+ runTaskRequest = buildFargateRequest(instanceProperties, containerName, clusterName, fargateDefinitionFamily);
+ } else if (launchType == CompactionECSLaunchType.EC2) {
+ runTaskRequest = builderForRunTaskRequest(instanceProperties, containerName, clusterName)
+ .launchType(LaunchType.EC2)
+ .taskDefinition(instanceProperties.get(COMPACTION_TASK_EC2_DEFINITION_FAMILY))
+ .build();
+ } else {
+ throw new IllegalArgumentException("Unrecognised ECS launch type: " + launchType);
+ }
+
+ return runTaskRequest;
+ }
+
+ /**
+ * Creates a new build export task request that can be passed to ECS.
+ *
+ * @param instanceProperties the instance properties
+ * @param containerName the name of the container
+ * @param clusterName the name for the cluster
+ * @param fargateDefinitionFamily the fargate definition family
+ * @return the request for ECS
+ * @throws IllegalArgumentException if launchType is FARGATE and version is null
+ */
+ private static RunTaskRequest createBulkExportTaskRequest(InstanceProperties instanceProperties, String containerName, String clusterName,
+ String fargateDefinitionFamily) {
+ return buildFargateRequest(instanceProperties, containerName, clusterName, fargateDefinitionFamily);
+ }
+
+ /**
+ * Builds a run task request for a fargate launch type.
+ *
+ * @param instanceProperties the instance properties
+ * @param containerName the name of the container
+ * @param clusterName the name for the cluster
+ * @param fargateDefinitionFamily the fargate definition family
+ * @return the request for ECS
+ * @throws IllegalArgumentException if launchType is FARGATE and version is null
+ */
+ private static RunTaskRequest buildFargateRequest(InstanceProperties instanceProperties, String containerName, String clusterName, String fargateDefinitionFamily) {
+ return builderForRunTaskRequest(instanceProperties, containerName, clusterName)
+ .launchType(LaunchType.FARGATE)
+ .platformVersion(Objects.requireNonNull(instanceProperties.get(FARGATE_VERSION), "fargateVersion cannot be null"))
+ .networkConfiguration(networkConfig(instanceProperties))
+ .taskDefinition(fargateDefinitionFamily)
+ .build();
+ }
+
+ /**
+ * Sets up the shared values in the run task request builder.
+ *
+ * @param instanceProperties the instance properties
+ * @param containerName the name of the container
+ * @param clusterName the name for the cluster
+ * @return the request for ECS
+ * @throws IllegalArgumentException if launchType is FARGATE and version is null
+ */
+ private static RunTaskRequest.Builder builderForRunTaskRequest(InstanceProperties instanceProperties, String containerName, String clusterName) {
+ return RunTaskRequest.builder()
+ .cluster(clusterName)
+ .overrides(createOverride(instanceProperties, containerName))
+ .propagateTags(PropagateTags.TASK_DEFINITION);
+ }
+
+ /**
+ * Create the container definition overrides for the task launch.
+ *
+ * @param instanceProperties the instance properties
+ * @param containerName the name of the container
+ * @return the container definition overrides
+ */
+ private static TaskOverride createOverride(InstanceProperties instanceProperties, String containerName) {
+ ContainerOverride containerOverride = ContainerOverride.builder()
+ .name(containerName)
+ .command(List.of(instanceProperties.get(CONFIG_BUCKET)))
+ .build();
+ return TaskOverride.builder()
+ .containerOverrides(containerOverride)
+ .build();
+ }
+
+ /**
+ * Create the container networking configuration.
+ *
+ * @param instanceProperties the instance properties
+ * @return task network configuration
+ */
+ private static NetworkConfiguration networkConfig(InstanceProperties instanceProperties) {
+ AwsVpcConfiguration vpcConfiguration = AwsVpcConfiguration.builder()
+ .subnets(instanceProperties.getList(SUBNETS))
+ .securityGroups(instanceProperties.getList(ECS_SECURITY_GROUPS))
+ .build();
+ return NetworkConfiguration.builder()
+ .awsvpcConfiguration(vpcConfiguration)
+ .build();
+ }
+
+ public static class Builder {
+ private TaskHostScaler hostScaler;
+ private String sqsJobQueueUrl;
+ private TaskCounts taskCounts;
+ private TaskLauncher taskLauncher;
+ private int maximumRunningTasks;
+
+ private Builder() {
+
+ }
+
+ public Builder hostScaler(TaskHostScaler hostScaler) {
+ this.hostScaler = hostScaler;
+ return this;
+ }
+
+ public Builder sqsJobQueueUrl(String sqsJobQueueUrl) {
+ this.sqsJobQueueUrl = sqsJobQueueUrl;
+ return this;
+ }
+
+ public Builder taskCounts(TaskCounts taskCounts) {
+ this.taskCounts = taskCounts;
+ return this;
+ }
+
+ public Builder taskLauncher(TaskLauncher taskLauncher) {
+ this.taskLauncher = taskLauncher;
+ return this;
+ }
+
+ public Builder maximumRunningTasks(int maximumRunningTasks) {
+ this.maximumRunningTasks = maximumRunningTasks;
+ return this;
+ }
+
+ public RunDataProcessingTasks build() {
+ return new RunDataProcessingTasks(this);
+ }
+ }
+}
diff --git a/java/common/common-task/src/main/java/sleeper/common/task/TaskHostScaler.java b/java/common/common-task/src/main/java/sleeper/common/task/TaskHostScaler.java
new file mode 100644
index 00000000000..183c5356a5e
--- /dev/null
+++ b/java/common/common-task/src/main/java/sleeper/common/task/TaskHostScaler.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2022-2025 Crown Copyright
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package sleeper.common.task;
+
+public interface TaskHostScaler {
+ void scaleTo(int numberOfTasks);
+}
diff --git a/java/common/common-task/src/test/java/sleeper/common/task/RunCompactionTasksTest.java b/java/common/common-task/src/test/java/sleeper/common/task/RunDataProcessingTasksTest.java
similarity index 97%
rename from java/common/common-task/src/test/java/sleeper/common/task/RunCompactionTasksTest.java
rename to java/common/common-task/src/test/java/sleeper/common/task/RunDataProcessingTasksTest.java
index 5e94c842194..4d1203ea47a 100644
--- a/java/common/common-task/src/test/java/sleeper/common/task/RunCompactionTasksTest.java
+++ b/java/common/common-task/src/test/java/sleeper/common/task/RunDataProcessingTasksTest.java
@@ -24,8 +24,8 @@
import sleeper.common.task.CompactionTaskHostScaler.CheckInstanceType;
import sleeper.common.task.CompactionTaskHostScaler.InstanceType;
import sleeper.common.task.CompactionTaskHostScaler.SetDesiredInstances;
-import sleeper.common.task.RunCompactionTasks.TaskCounts;
-import sleeper.common.task.RunCompactionTasks.TaskLauncher;
+import sleeper.common.task.RunDataProcessingTasks.TaskCounts;
+import sleeper.common.task.RunDataProcessingTasks.TaskLauncher;
import sleeper.core.properties.instance.InstanceProperties;
import java.util.ArrayList;
@@ -48,7 +48,7 @@
import static sleeper.core.properties.instance.CompactionProperty.MAXIMUM_CONCURRENT_COMPACTION_TASKS;
import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties;
-public class RunCompactionTasksTest {
+public class RunDataProcessingTasksTest {
private final InstanceProperties instanceProperties = createTestInstanceProperties();
private final Map instanceTypes = new HashMap<>();
private final Map autoScalingGroupMaxSizeByName = new HashMap<>();
@@ -415,8 +415,12 @@ private void runToMeetTargetTasks(int requestedTasks, TaskCounts taskCounts) {
taskRunner(taskCounts).runToMeetTargetTasks(requestedTasks);
}
- private RunCompactionTasks taskRunner(TaskCounts taskCounts) {
- return new RunCompactionTasks(instanceProperties, taskCounts, hostScaler(), taskLauncher());
+ private RunDataProcessingTasks taskRunner(TaskCounts taskCounts) {
+ return RunDataProcessingTasks.builderForCompactions(instanceProperties)
+ .hostScaler(hostScaler())
+ .taskCounts(taskCounts)
+ .taskLauncher(taskLauncher())
+ .build();
}
private CompactionTaskHostScaler hostScaler() {
diff --git a/java/compaction/compaction-task-creation/src/main/java/sleeper/compaction/task/creation/RunCompactionTasksLambda.java b/java/compaction/compaction-task-creation/src/main/java/sleeper/compaction/task/creation/RunCompactionTasksLambda.java
index 8b8e9917f63..5686d856cf7 100644
--- a/java/compaction/compaction-task-creation/src/main/java/sleeper/compaction/task/creation/RunCompactionTasksLambda.java
+++ b/java/compaction/compaction-task-creation/src/main/java/sleeper/compaction/task/creation/RunCompactionTasksLambda.java
@@ -24,7 +24,7 @@
import software.amazon.awssdk.services.sqs.SqsClient;
import sleeper.common.task.QueueMessageCount;
-import sleeper.common.task.RunCompactionTasks;
+import sleeper.common.task.RunDataProcessingTasks;
import sleeper.configuration.properties.S3InstanceProperties;
import sleeper.core.properties.instance.InstanceProperties;
@@ -34,7 +34,7 @@
* A lambda function to start compaction tasks based on the number of queued compaction jobs.
*/
public class RunCompactionTasksLambda {
- private final RunCompactionTasks runTasks;
+ private final RunDataProcessingTasks runTasks;
private final QueueMessageCount.Client queueMessageCount;
public RunCompactionTasksLambda() {
@@ -45,7 +45,7 @@ public RunCompactionTasksLambda() {
AutoScalingClient asClient = AutoScalingClient.create();
Ec2Client ec2Client = Ec2Client.create();
InstanceProperties instanceProperties = S3InstanceProperties.loadFromBucket(s3Client, s3Bucket);
- this.runTasks = new RunCompactionTasks(instanceProperties, ecsClient, asClient, ec2Client);
+ this.runTasks = RunDataProcessingTasks.createForCompactions(instanceProperties, ecsClient, asClient, ec2Client);
this.queueMessageCount = QueueMessageCount.withSqsClient(sqsClient);
}
diff --git a/scripts/utility/runBulkExportTasks.sh b/scripts/utility/runBulkExportTasks.sh
new file mode 100755
index 00000000000..e88a81698de
--- /dev/null
+++ b/scripts/utility/runBulkExportTasks.sh
@@ -0,0 +1,34 @@
+#!/usr/bin/env bash
+# Copyright 2022-2025 Crown Copyright
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -e
+unset CDPATH
+
+if [ "$#" -ne 2 ]; then
+ echo "Usage: $0 "
+ exit 1
+fi
+
+SCRIPTS_DIR=$(cd "$(dirname "$0")" && cd .. && pwd)
+
+TEMPLATE_DIR=${SCRIPTS_DIR}/templates
+JAR_DIR=${SCRIPTS_DIR}/jars
+
+VERSION=$(cat "${TEMPLATE_DIR}/version.txt")
+
+echo "-------------------------------------------------------"
+echo "Running bulk export tasks"
+echo "-------------------------------------------------------"
+java -cp "${JAR_DIR}/clients-${VERSION}-utility.jar" sleeper.common.task.RunBulkExportTasks "$@"