Skip to content

Commit ea1c1b7

Browse files
Add Google Batch native retry on spot termination (#4500)
This commit adds the automatic retry made by Google Batch when a spot reclaim terminates a job. This feature can be controlled by the setting: ``` google.batch.maxSpotAttempts = <number> ``` By default, the execution is attempted 5 times. When 0 is specified, no attempt is made on a spot reclaim. Signed-off-by: Ben Sherman <[email protected]> Signed-off-by: Paolo Di Tommaso <[email protected]> Co-authored-by: Paolo Di Tommaso <[email protected]>
1 parent c271bb1 commit ea1c1b7

File tree

4 files changed

+39
-8
lines changed

4 files changed

+39
-8
lines changed

docs/config.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,11 @@ The following settings are available for Google Cloud Batch:
775775
`google.location`
776776
: The Google Cloud location where jobs are executed (default: `us-central1`).
777777

778+
`google.batch.maxSpotAttempts`
779+
: :::{versionadded} 23.11.0-edge
780+
:::
781+
: Max number of execution attempts of a job interrupted by a Compute Engine spot reclaim event (default: `5`).
782+
778783
`google.project`
779784
: The Google Cloud project ID to use for pipeline execution
780785

plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import com.google.cloud.batch.v1.AllocationPolicy
2424
import com.google.cloud.batch.v1.ComputeResource
2525
import com.google.cloud.batch.v1.Environment
2626
import com.google.cloud.batch.v1.Job
27+
import com.google.cloud.batch.v1.LifecyclePolicy
2728
import com.google.cloud.batch.v1.LogsPolicy
2829
import com.google.cloud.batch.v1.Runnable
2930
import com.google.cloud.batch.v1.ServiceAccount
@@ -207,6 +208,23 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
207208
)
208209
.addAllVolumes( launcher.getVolumes() )
209210

211+
// retry on spot reclaim
212+
if( executor.config.maxSpotAttempts ) {
213+
// Note: Google Batch uses the special exit status 50001 to signal
214+
// the execution was terminated due a spot reclaim. When this happens
215+
// The policy re-execute the jobs automatically up to `maxSpotAttempts` times
216+
taskSpec
217+
.setMaxRetryCount( executor.config.maxSpotAttempts )
218+
.addLifecyclePolicies(
219+
LifecyclePolicy.newBuilder()
220+
.setActionCondition(
221+
LifecyclePolicy.ActionCondition.newBuilder()
222+
.addExitCodes(50001)
223+
)
224+
.setAction(LifecyclePolicy.Action.RETRY_TASK)
225+
)
226+
}
227+
210228
// instance policy
211229
final allocationPolicy = AllocationPolicy.newBuilder()
212230
final instancePolicyOrTemplate = AllocationPolicy.InstancePolicyOrTemplate.newBuilder()

plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ class BatchConfig {
3737
private List<String> allowedLocations
3838
private MemoryUnit bootDiskSize
3939
private String cpuPlatform
40-
private boolean spot
40+
private int maxSpotAttempts
4141
private boolean preemptible
42+
private boolean spot
4243
private boolean usePrivateAddress
4344
private String network
4445
private String subnetwork
@@ -49,6 +50,7 @@ class BatchConfig {
4950
List<String> getAllowedLocations() { allowedLocations }
5051
MemoryUnit getBootDiskSize() { bootDiskSize }
5152
String getCpuPlatform() { cpuPlatform }
53+
int getMaxSpotAttempts() { maxSpotAttempts }
5254
boolean getPreemptible() { preemptible }
5355
boolean getSpot() { spot }
5456
boolean getUsePrivateAddress() { usePrivateAddress }
@@ -63,8 +65,9 @@ class BatchConfig {
6365
result.allowedLocations = session.config.navigate('google.batch.allowedLocations', List.of()) as List<String>
6466
result.bootDiskSize = session.config.navigate('google.batch.bootDiskSize') as MemoryUnit
6567
result.cpuPlatform = session.config.navigate('google.batch.cpuPlatform')
66-
result.spot = session.config.navigate('google.batch.spot',false)
68+
result.maxSpotAttempts = session.config.navigate('google.batch.maxSpotAttempts',5) as int
6769
result.preemptible = session.config.navigate('google.batch.preemptible',false)
70+
result.spot = session.config.navigate('google.batch.spot',false)
6871
result.usePrivateAddress = session.config.navigate('google.batch.usePrivateAddress',false)
6972
result.network = session.config.navigate('google.batch.network')
7073
result.subnetwork = session.config.navigate('google.batch.subnetwork')

plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ class GoogleBatchTaskHandlerTest extends Specification {
130130
getAllowedLocations() >> ['zones/us-central1-a', 'zones/us-central1-c']
131131
getBootDiskSize() >> BOOT_DISK
132132
getCpuPlatform() >> CPU_PLATFORM
133+
getMaxSpotAttempts() >> 5
133134
getSpot() >> true
134135
getNetwork() >> 'net-1'
135136
getServiceAccountEmail() >> '[email protected]'
@@ -170,16 +171,20 @@ class GoogleBatchTaskHandlerTest extends Specification {
170171

171172
and:
172173
def taskGroup = req.getTaskGroups(0)
173-
def runnable = taskGroup.getTaskSpec().getRunnables(0)
174+
def taskSpec = taskGroup.getTaskSpec()
175+
def runnable = taskSpec.getRunnables(0)
174176
def allocationPolicy = req.getAllocationPolicy()
175177
def instancePolicy = allocationPolicy.getInstances(0).getPolicy()
176178
def networkInterface = allocationPolicy.getNetwork().getNetworkInterfaces(0)
177179
and:
178-
taskGroup.getTaskSpec().getComputeResource().getBootDiskMib() == BOOT_DISK.toMega()
179-
taskGroup.getTaskSpec().getComputeResource().getCpuMilli() == CPUS * 1_000
180-
taskGroup.getTaskSpec().getComputeResource().getMemoryMib() == MEM.toMega()
181-
taskGroup.getTaskSpec().getMaxRunDuration().getSeconds() == TIMEOUT.seconds
182-
taskGroup.getTaskSpec().getVolumes(0).getMountPath() == '/tmp'
180+
taskSpec.getComputeResource().getBootDiskMib() == BOOT_DISK.toMega()
181+
taskSpec.getComputeResource().getCpuMilli() == CPUS * 1_000
182+
taskSpec.getComputeResource().getMemoryMib() == MEM.toMega()
183+
taskSpec.getMaxRunDuration().getSeconds() == TIMEOUT.seconds
184+
taskSpec.getVolumes(0).getMountPath() == '/tmp'
185+
taskSpec.getMaxRetryCount() == 5
186+
taskSpec.getLifecyclePolicies(0).getActionCondition().getExitCodes(0) == 50001
187+
taskSpec.getLifecyclePolicies(0).getAction().toString() == 'RETRY_TASK'
183188
and:
184189
runnable.getContainer().getCommandsList().join(' ') == '/bin/bash -o pipefail -c bash .command.run'
185190
runnable.getContainer().getImageUri() == CONTAINER_IMAGE

0 commit comments

Comments
 (0)