Skip to content
8 changes: 4 additions & 4 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ jobs:
COMMIT_MESSAGE="$(curl -s \
-H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
https://api.github.com/repos/${{ github.repository }}/commits/$COMMIT_SHA | jq -r '.commit.message')"
echo "Commit message=$(printf "%q" "$COMMIT_MESSAGE")" | head -n 1
echo "commit_message=$(printf "%q" "$COMMIT_MESSAGE")" | head -n 1 >> $GITHUB_OUTPUT
echo "Commit message=$(printf "%s" "$COMMIT_MESSAGE")" | head -n 1
echo "commit_message=$(printf "%s" "$COMMIT_MESSAGE")" | head -n 1 >> $GITHUB_OUTPUT
else
echo "GitHub event=${{ github.event_name }}"
COMMIT_MESSAGE="${{ github.event.head_commit.message }}"
echo "Commit message=$(printf "%q" "$COMMIT_MESSAGE")" | head -n 1
echo "commit_message=$(printf "%q" "$COMMIT_MESSAGE")" | head -n 1 >> $GITHUB_OUTPUT
echo "Commit message=$(printf "%s" "$COMMIT_MESSAGE")" | head -n 1
echo "commit_message=$(printf "%s" "$COMMIT_MESSAGE")" | head -n 1 >> $GITHUB_OUTPUT
fi

- name: Setup env
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import software.amazon.awssdk.services.batch.BatchClient
import software.amazon.awssdk.services.batch.model.DescribeComputeEnvironmentsRequest
import software.amazon.awssdk.services.batch.model.DescribeJobQueuesRequest
import software.amazon.awssdk.services.batch.model.DescribeJobsRequest
import software.amazon.awssdk.services.batch.model.EcsTaskDetails
import software.amazon.awssdk.services.batch.model.JobDetail
import software.amazon.awssdk.services.batch.model.TaskContainerDetails
import software.amazon.awssdk.services.ec2.Ec2Client
import software.amazon.awssdk.services.ec2.model.DescribeInstancesRequest
import software.amazon.awssdk.services.ec2.model.Instance
Expand Down Expand Up @@ -193,14 +196,55 @@ class AwsBatchHelper {
final response = batchClient.describeJobs(request)
if( response.jobs() ) {
final detail = response.jobs()[0]
return detail.container().logStreamName()
return getTaskContainer(detail)?.logStreamName()
?: detail.container()?.logStreamName()
}
else {
log.debug "Unable to find info for batch job id=$jobId"
return null
}
}

/**
* Retrieve the first EcsTaskDetails from the given JobDetail.
*
* In combination with {@code getTaskContainer(job)} this is analogous to {@code job.getContainer()}, but
* using the multi-container ECSProperties model.
*
* @param job
* @return
* The first EcsTaskDetails of the first TaskProperties, or {@code null}
*/
static EcsTaskDetails getTaskProperties(JobDetail job) {
try {
return job.ecsProperties().taskProperties().first()
} catch (Exception e) {
def jobId = job?.jobId() ?: '(unknown)'
log.debug "Unable to get container properties for batch job id=$jobId: ${e.getMessage()}"
return null
}
}

/**
* Retrieve the first TaskContainerDetails from the given JobDetail.
*
* In combination with {@code getTaskProperties(job)} this is analogous to {@code job.getContainer()}, but
* using the multi-container ECSProperties model.
*
* @param job
* @return
* The first TaskContainerDetails of the first TaskProperties, or {@code null}
*/
static TaskContainerDetails getTaskContainer(JobDetail job) {
try {
return getTaskProperties(job).containers().first()
} catch (Exception e) {
def jobId = job?.jobId() ?: '(unknown)'
log.debug "Unable to get container details for batch job id=$jobId: ${e.getMessage()}"
return null
}
}

/**
* Retrieve the cloudwatch logs for the specified AWS Batch Job ID
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package nextflow.cloud.aws.batch


import static nextflow.cloud.aws.batch.AwsBatchHelper.*
import static nextflow.cloud.aws.batch.AwsContainerOptionsMapper.*

import java.nio.file.Path
Expand Down Expand Up @@ -51,13 +51,13 @@ import nextflow.util.TestOnly
import software.amazon.awssdk.services.batch.BatchClient
import software.amazon.awssdk.services.batch.model.ArrayProperties
import software.amazon.awssdk.services.batch.model.AssignPublicIp
import software.amazon.awssdk.services.batch.model.AttemptContainerDetail
import software.amazon.awssdk.services.batch.model.AttemptDetail
import software.amazon.awssdk.services.batch.model.BatchException
import software.amazon.awssdk.services.batch.model.ClientException
import software.amazon.awssdk.services.batch.model.ContainerOverrides
import software.amazon.awssdk.services.batch.model.DescribeJobDefinitionsRequest
import software.amazon.awssdk.services.batch.model.DescribeJobDefinitionsResponse
import software.amazon.awssdk.services.batch.model.DescribeJobsRequest
import software.amazon.awssdk.services.batch.model.EcsPropertiesOverride
import software.amazon.awssdk.services.batch.model.EphemeralStorage
import software.amazon.awssdk.services.batch.model.EvaluateOnExit
import software.amazon.awssdk.services.batch.model.Host
Expand All @@ -79,6 +79,8 @@ import software.amazon.awssdk.services.batch.model.RetryStrategy
import software.amazon.awssdk.services.batch.model.RuntimePlatform
import software.amazon.awssdk.services.batch.model.SubmitJobRequest
import software.amazon.awssdk.services.batch.model.SubmitJobResponse
import software.amazon.awssdk.services.batch.model.TaskContainerOverrides
import software.amazon.awssdk.services.batch.model.TaskPropertiesOverride
import software.amazon.awssdk.services.batch.model.TerminateJobRequest
import software.amazon.awssdk.services.batch.model.Volume
/**
Expand Down Expand Up @@ -246,7 +248,8 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
checkIfUnschedulable(job)
// fetch the task arn
if( !taskArn )
taskArn = job?.container()?.taskArn()
taskArn = getTaskProperties(job)?.taskRoleArn()
?: job?.container()?.taskArn()
return result
}

Expand Down Expand Up @@ -282,12 +285,20 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
final result = new ArrayList(2)
if( job.statusReason() )
result.add(job.statusReason())
final AttemptContainerDetail container = job.attempts() ? job.attempts()[-1].container() : null
if( container?.reason() )
result.add(container.reason())
final attemptDetail = job.attempts() ? job.attempts()[-1] : null
final reason = errReasonFromAttempt (attemptDetail)
if( reason )
result.add(reason)
return result.join(' - ')
}

private String errReasonFromAttempt(AttemptDetail attempt){
if( !attempt )
return null
return attempt.taskProperties()?.first()?.containers()?.first()?.reason()
?: attempt.container()?.reason()
}

/**
* {@inheritDoc}
*/
Expand All @@ -307,7 +318,9 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
// take the exit code from the `.exitcode` file create by nextflow
// the rationale of this is that, in case of error, the exit code return
// by the batch API is more reliable.
task.exitStatus = job.container().exitCode() ?: readExitFile()
task.exitStatus = getTaskContainer(job)?.exitCode()
?: job.container()?.exitCode()
?: readExitFile()
// finalize the task
task.stdout = outputFile
if( job?.status() == JobStatus.FAILED || task.exitStatus==Integer.MAX_VALUE ) {
Expand Down Expand Up @@ -818,7 +831,7 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job

// set the actual command
final resources = new ArrayList<ResourceRequirement>(5)
final container = ContainerOverrides.builder()
final container = TaskContainerOverrides.builder()
container.command(getSubmitCommand())
// set the task memory
final cpus = task.config.getCpus()
Expand Down Expand Up @@ -849,7 +862,13 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
if( vars )
container.environment(vars)

builder.containerOverrides(container.build())
builder.ecsPropertiesOverride(EcsPropertiesOverride.builder()
.taskProperties(TaskPropertiesOverride.builder()
.containers(container.build())
.build()
)
.build()
)

// set the array properties
if( task instanceof TaskArrayRun ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package nextflow.cloud.aws.batch.model

import groovy.transform.CompileStatic
import software.amazon.awssdk.services.batch.model.ContainerProperties
import software.amazon.awssdk.services.batch.model.EcsTaskProperties
import software.amazon.awssdk.services.batch.model.EphemeralStorage
import software.amazon.awssdk.services.batch.model.KeyValuePair
import software.amazon.awssdk.services.batch.model.LinuxParameters
Expand All @@ -27,13 +27,14 @@ import software.amazon.awssdk.services.batch.model.NetworkConfiguration
import software.amazon.awssdk.services.batch.model.ResourceRequirement
import software.amazon.awssdk.services.batch.model.RuntimePlatform
import software.amazon.awssdk.services.batch.model.Secret
import software.amazon.awssdk.services.batch.model.TaskContainerProperties
import software.amazon.awssdk.services.batch.model.Ulimit
import software.amazon.awssdk.services.batch.model.Volume

/**
* Models the container properties used to configure an AWS Batch job.
*
* This is a mutable version of {@link ContainerProperties} required
* This is a mutable version of {@link TaskContainerProperties} required
* to simplify the extension of container settings in the AWS Batch executor
* and its sub-classes (e.g. nf-xpack).
*
Expand Down Expand Up @@ -236,14 +237,11 @@ class ContainerPropertiesModel {
return runtimePlatform
}

ContainerProperties toBatchContainerProperties() {
def builder = ContainerProperties.builder()

EcsTaskProperties toBatchContainerProperties() {
def builder = TaskContainerProperties.builder();
if (image) builder.image(image)
if (command) builder.command(command)
if (resourceRequirements) builder.resourceRequirements(resourceRequirements)
if (jobRoleArn) builder.jobRoleArn(jobRoleArn)
if (executionRoleArn) builder.executionRoleArn(executionRoleArn)
if (linuxParameters) builder.linuxParameters(linuxParameters)
if (environment) builder.environment(environment)
if (privileged) builder.privileged(privileged)
Expand All @@ -252,12 +250,16 @@ class ContainerPropertiesModel {
if (ulimits) builder.ulimits(ulimits)
if (logConfiguration) builder.logConfiguration(logConfiguration)
if (mountPoints) builder.mountPoints(mountPoints)
if (volumes) builder.volumes(volumes)
if (networkConfiguration) builder.networkConfiguration(networkConfiguration)
if (ephemeralStorage) builder.ephemeralStorage(ephemeralStorage)
if (runtimePlatform) builder.runtimePlatform(runtimePlatform)
def ecsTaskBuilder = EcsTaskProperties.builder()
ecsTaskBuilder.containers(builder.build())
if (jobRoleArn) ecsTaskBuilder.taskRoleArn(jobRoleArn)
if (executionRoleArn) ecsTaskBuilder.executionRoleArn(executionRoleArn)
if (volumes) ecsTaskBuilder.volumes(volumes)
if (networkConfiguration) ecsTaskBuilder.networkConfiguration(networkConfiguration)
if (ephemeralStorage) ecsTaskBuilder.ephemeralStorage(ephemeralStorage)
if (runtimePlatform) ecsTaskBuilder.runtimePlatform(runtimePlatform)

return builder.build()
return ecsTaskBuilder.build()
}

@Override
Expand All @@ -280,6 +282,7 @@ class ContainerPropertiesModel {
", networkConfiguration=" + networkConfiguration +
", ephemeralStorage=" + ephemeralStorage +
", runtimePlatform=" + runtimePlatform +
", ecsMode=true" + //Added to generate a different token if the job definition was generated with older containerProperties
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package nextflow.cloud.aws.batch.model


import groovy.transform.CompileStatic
import software.amazon.awssdk.services.batch.model.EcsProperties
import software.amazon.awssdk.services.batch.model.JobDefinitionType
import software.amazon.awssdk.services.batch.model.PlatformCapability
import software.amazon.awssdk.services.batch.model.RegisterJobDefinitionRequest
Expand Down Expand Up @@ -116,7 +117,10 @@ class RegisterJobDefinitionModel {
if (platformCapabilities)
builder.platformCapabilities(platformCapabilities)
if (containerProperties)
builder.containerProperties(containerProperties.toBatchContainerProperties())
builder.ecsProperties(EcsProperties.builder()
.taskProperties(containerProperties.toBatchContainerProperties())
.build()
)
if (parameters)
builder.parameters(parameters)
if (tags)
Expand Down
Loading