Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import sleeper.core.util.ThreadSleep;

import java.util.Map;
import java.util.function.Consumer;

import static sleeper.core.util.RateLimitUtils.sleepForSustainedRatePerSecond;

Expand Down Expand Up @@ -62,24 +63,31 @@ public void handleEvent(
case "Update":
break;
case "Delete":
stopTasks(clusterName);
stopTasks(ecsClient, clusterName, maxResults, sleep);
break;
default:
throw new IllegalArgumentException("Invalid request type: " + event.getRequestType());
}
}

private void stopTasks(String clusterName) {
ecsClient.listTasksPaginator(builder -> builder.cluster(clusterName).maxResults(maxResults))
.taskArns()
private static void stopTasks(EcsClient ecs, String clusterName, int maxResults, ThreadSleep sleep) {
LOGGER.info("Stopping tasks for ECS cluster {}", clusterName);
forEachTaskArn(ecs, clusterName, maxResults, taskArn -> {
// Rate limit for ECS StopTask is 100 burst, 40 sustained:
// https://docs.aws.amazon.com/AmazonECS/latest/APIReference/request-throttling.html
sleepForSustainedRatePerSecond(30, sleep);
ecs.stopTask(builder -> builder.cluster(clusterName).task(taskArn)
.reason("Cleaning up before cdk destroy"));
LOGGER.info("Stopped task {} in ECS cluster {}", taskArn, clusterName);
});
}

private static void forEachTaskArn(EcsClient ecs, String clusterName, int maxResults, Consumer<String> consumer) {
ecs.listTasksPaginator(builder -> builder.cluster(clusterName).maxResults(maxResults))
.stream()
.forEach(
task -> {
LOGGER.info("Stopping task {} in cluster {} ", task, clusterName);
// Rate limit for ECS StopTask is 100 burst, 40 sustained:
// https://docs.aws.amazon.com/AmazonECS/latest/APIReference/request-throttling.html
sleepForSustainedRatePerSecond(30, sleep);
ecsClient.stopTask(builder -> builder.cluster(clusterName).task(task));
});
.peek(response -> LOGGER.info("Found {} tasks", response.taskArns().size()))
.flatMap(response -> response.taskArns().stream())
.forEach(consumer);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public enum LogGroupRef {
BULK_IMPORT_AUTODELETE("bulk-import-autodelete"),
BULK_IMPORT_AUTODELETE_PROVIDER("bulk-import-autodelete-provider"),
INGEST_TASKS("IngestTasks"),
INGEST_TASKS_AUTOSTOP("ingest-tasks-autostop"),
INGEST_TASKS_AUTOSTOP_PROVIDER("ingest-tasks-autostop-provider"),
INGEST_CREATE_TASKS("ingest-create-tasks"),
INGEST_BATCHER_SUBMIT_FILES("ingest-batcher-submit-files"),
INGEST_BATCHER_CREATE_JOBS("ingest-batcher-create-jobs"),
Expand Down
13 changes: 11 additions & 2 deletions java/cdk/src/main/java/sleeper/cdk/stack/ingest/IngestStack.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import sleeper.cdk.jars.LambdaCode;
import sleeper.cdk.stack.core.CoreStacks;
import sleeper.cdk.stack.core.LoggingStack.LogGroupRef;
import sleeper.cdk.util.AutoStopEcsClusterTasks;
import sleeper.cdk.util.Utils;
import sleeper.core.deploy.DockerDeployment;
import sleeper.core.deploy.LambdaHandler;
Expand Down Expand Up @@ -109,6 +110,7 @@ public IngestStack(
// - A lambda that periodically checks the number of running ingest tasks
// and if there are not enough (i.e. there is a backlog on the queue
// then it creates more tasks).
// - A lambda that stops task when a delete cluster event is triggered.

IBucket jarsBucket = Bucket.fromBucketName(this, "JarsBucket", jars.bucketName());
LambdaCode lambdaCode = jars.lambdaCode(jarsBucket);
Expand All @@ -117,7 +119,7 @@ public IngestStack(
sqsQueueForIngestJobs(coreStacks, topic, errorMetrics);

// ECS cluster for ingest tasks
ecsClusterForIngestTasks(jarsBucket, coreStacks, ingestJobQueue);
ecsClusterForIngestTasks(jarsBucket, coreStacks, ingestJobQueue, lambdaCode);

// Lambda to create ingest tasks
lambdaToCreateIngestTasks(coreStacks, ingestJobQueue, lambdaCode);
Expand Down Expand Up @@ -181,7 +183,9 @@ private Queue sqsQueueForIngestJobs(CoreStacks coreStacks, Topic topic, List<IMe
private Cluster ecsClusterForIngestTasks(
IBucket jarsBucket,
CoreStacks coreStacks,
Queue ingestJobQueue) {
Queue ingestJobQueue,
LambdaCode lambdaCode) {

VpcLookupOptions vpcLookupOptions = VpcLookupOptions.builder()
.vpcId(instanceProperties.get(VPC_ID))
.build();
Expand Down Expand Up @@ -236,6 +240,11 @@ private Cluster ecsClusterForIngestTasks(
.build();
new CfnOutput(this, INGEST_CONTAINER_ROLE_ARN, ingestRoleARNProps);

AutoStopEcsClusterTasks.autoStopTasksOnEcsCluster(this, instanceProperties, lambdaCode,
cluster, clusterName,
coreStacks.getLogGroup(LogGroupRef.INGEST_TASKS_AUTOSTOP),
coreStacks.getLogGroup(LogGroupRef.INGEST_TASKS_AUTOSTOP_PROVIDER));

return cluster;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2022-2025 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.cdk.util;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import software.amazon.awscdk.CustomResource;
import software.amazon.awscdk.Duration;
import software.amazon.awscdk.customresources.Provider;
import software.amazon.awscdk.services.ecs.ICluster;
import software.amazon.awscdk.services.iam.IRole;
import software.amazon.awscdk.services.iam.ManagedPolicy;
import software.amazon.awscdk.services.iam.PolicyStatement;
import software.amazon.awscdk.services.lambda.IFunction;
import software.amazon.awscdk.services.logs.ILogGroup;
import software.constructs.Construct;

import sleeper.cdk.jars.LambdaCode;
import sleeper.core.deploy.LambdaHandler;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.util.EnvironmentUtils;

import java.util.List;
import java.util.Map;
import java.util.Objects;

@SuppressFBWarnings("NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE")
public class AutoStopEcsClusterTasks {

private AutoStopEcsClusterTasks() {
}

public static void autoStopTasksOnEcsCluster(
Construct scope, InstanceProperties instanceProperties, LambdaCode lambdaCode,
ICluster cluster, String clusterName,
ILogGroup logGroup,
ILogGroup providerLogGroup) {

String id = cluster.getNode().getId() + "-AutoStop";
String functionName = clusterName + "-autostop";

IFunction lambda = lambdaCode.buildFunction(scope, LambdaHandler.AUTO_STOP_ECS_CLUSTER_TASKS, id + "Lambda", builder -> builder
.functionName(functionName)
.memorySize(2048)
.environment(EnvironmentUtils.createDefaultEnvironmentNoConfigBucket(instanceProperties))
.description("Lambda for auto-stopping ECS tasks")
.logGroup(logGroup)
.timeout(Duration.minutes(10)));

// Grant this function permission to list tasks and stop tasks
PolicyStatement policyStatement = PolicyStatement.Builder
.create()
.resources(List.of("*"))
.actions(List.of("ecs:ListTasks", "ecs:StopTask", "iam:PassRole"))
.build();
IRole role = Objects.requireNonNull(lambda.getRole());
role.addToPrincipalPolicy(policyStatement);
role.addManagedPolicy(ManagedPolicy.fromAwsManagedPolicyName("service-role/AmazonECSTaskExecutionRolePolicy"));

Provider propertiesWriterProvider = Provider.Builder.create(scope, id + "Provider")
.onEventHandler(lambda)
.logGroup(providerLogGroup)
.build();

CustomResource customResource = CustomResource.Builder.create(scope, id)
.resourceType("Custom::AutoStopEcsClusterTasks")
.properties(Map.of("cluster", clusterName))
.serviceToken(propertiesWriterProvider.getServiceToken())
.build();

customResource.getNode().addDependency(cluster);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ public class LambdaHandler {
.jar(LambdaJar.CUSTOM_RESOURCES)
.handler("sleeper.cdk.custom.AutoDeleteS3ObjectsLambda::handleEvent")
.core().add();
public static final LambdaHandler AUTO_STOP_ECS_CLUSTER_TASKS = builder()
.jar(LambdaJar.CUSTOM_RESOURCES)
.handler("sleeper.cdk.custom.AutoStopEcsClusterTasksLambda::handleEvent")
.core().add();
public static final LambdaHandler PROPERTIES_WRITER = builder()
.jar(LambdaJar.CUSTOM_RESOURCES)
.handler("sleeper.cdk.custom.PropertiesWriterLambda::handleEvent")
Expand Down
Loading