Skip to content

Commit fcf5b87

Browse files
authored
Fix Job arrays exceeding queueSize (#6314)
--------- Signed-off-by: jorgee <[email protected]> Signed-off-by: Ben Sherman <[email protected]>
1 parent 9a70cdb commit fcf5b87

File tree

5 files changed

+81
-7
lines changed

5 files changed

+81
-7
lines changed

modules/nextflow/src/main/groovy/nextflow/processor/ParallelPollingMonitor.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class ParallelPollingMonitor extends TaskPollingMonitor {
5555

5656
@Override
5757
protected boolean canSubmit(TaskHandler handler) {
58-
return super.canSubmit(handler) && (semaphore == null || semaphore.tryAcquire())
58+
return super.canSubmit(handler) && (semaphore == null || semaphore.tryAcquire(handler.getForksCount()))
5959
}
6060

6161
protected RateLimiter createSubmitRateLimit() {
@@ -95,7 +95,7 @@ class ParallelPollingMonitor extends TaskPollingMonitor {
9595

9696
@Override
9797
boolean evict(TaskHandler handler) {
98-
semaphore?.release()
98+
semaphore?.release(handler.getForksCount())
9999
return super.evict(handler)
100100
}
101101
}

modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,13 @@ abstract class TaskHandler {
254254
return record
255255
}
256256

257+
/**
258+
* Determine the number of forks consumed by the task handler.
259+
*/
260+
int getForksCount() {
261+
return task instanceof TaskArrayRun ? task.getArraySize() : 1
262+
}
263+
257264
/**
258265
* Determine if a process can be forked i.e. can launch
259266
* a parallel task execution. This is only enforced when
@@ -266,7 +273,7 @@ abstract class TaskHandler {
266273
*/
267274
boolean canForkProcess() {
268275
final max = task.processor.maxForks
269-
return !max ? true : task.processor.forksCount < max
276+
return !max ? true : task.processor.forksCount + getForksCount() <= max
270277
}
271278

272279
/**
@@ -283,14 +290,18 @@ abstract class TaskHandler {
283290
* Increment the number of current forked processes
284291
*/
285292
final void incProcessForks() {
286-
task.processor.forksCount?.increment()
293+
def count = task.processor.forksCount
294+
if( count != null )
295+
count.add(getForksCount())
287296
}
288297

289298
/**
290299
* Decrement the number of current forked processes
291300
*/
292301
final void decProcessForks() {
293-
task.processor.forksCount?.decrement()
302+
def count = task.processor.forksCount
303+
if( count != null )
304+
count.add(-getForksCount())
294305
}
295306

296307
/**

modules/nextflow/src/main/groovy/nextflow/processor/TaskPollingMonitor.groovy

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,12 @@ class TaskPollingMonitor implements TaskMonitor {
198198
* by the polling monitor
199199
*/
200200
protected boolean canSubmit(TaskHandler handler) {
201-
(capacity>0 ? runningQueue.size() < capacity : true) && handler.canForkProcess() && handler.isReady()
201+
int slots = handler.getForksCount()
202+
if( capacity > 0 && slots > capacity )
203+
throw new IllegalArgumentException("Job array ${handler.task.name} exceeds the queue size (array size: $slots, queue size: $capacity)")
204+
if( capacity > 0 && runningQueue.size() + slots > capacity )
205+
return false
206+
return handler.canForkProcess() && handler.isReady()
202207
}
203208

204209
/**

modules/nextflow/src/test/groovy/nextflow/processor/ParallelPollingMonitorTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class ParallelPollingMonitorTest extends Specification {
8383
def retry = new AtomicInteger()
8484

8585
def session = Mock(Session)
86-
def handler = Mock(TaskHandler)
86+
def handler = Spy(TaskHandler)
8787

8888
def opts = new ThrottlingExecutor.Options()
8989
.retryOn(IllegalArgumentException)

modules/nextflow/src/test/groovy/nextflow/processor/TaskPollingMonitorTest.groovy

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616

1717
package nextflow.processor
1818

19+
import java.util.concurrent.atomic.LongAdder
1920

2021
import nextflow.Session
2122
import nextflow.util.Duration
2223
import nextflow.util.RateUnit
24+
import nextflow.util.ThrottlingExecutor
2325
import spock.lang.Specification
26+
import spock.lang.Unroll
2427
/**
2528
*
2629
* @author Paolo Di Tommaso <[email protected]>
@@ -148,4 +151,59 @@ class TaskPollingMonitorTest extends Specification {
148151
3 * session.notifyTaskSubmit(handler)
149152
}
150153

154+
@Unroll
155+
def 'should check whether job can be submitted' () {
156+
given:
157+
def session = Mock(Session)
158+
def monitor = Spy(new TaskPollingMonitor(name: 'foo', session: session, capacity: CAPACITY, pollInterval: Duration.of('1min')))
159+
and:
160+
def processor = Mock(TaskProcessor) {
161+
getForksCount() >> new LongAdder()
162+
getMaxForks() >> FORKS
163+
}
164+
def handler = Spy(TaskHandler)
165+
handler.task = Mock(TaskRun) {
166+
getProcessor() >> processor
167+
}
168+
def arrayHandler = Spy(TaskHandler) {
169+
isReady() >> true
170+
}
171+
arrayHandler.task = Mock(TaskArrayRun) {
172+
getArraySize() >> ARRAY
173+
getProcessor() >> processor
174+
}
175+
176+
and:
177+
SUBMIT.times { monitor.runningQueue.add(handler) }
178+
179+
expect:
180+
monitor.runningQueue.size() == SUBMIT
181+
monitor.canSubmit(arrayHandler) == EXPECTED
182+
where:
183+
CAPACITY | SUBMIT | FORKS | ARRAY | EXPECTED
184+
0 | 0 | 0 | 2 | true
185+
0 | 0 | 1 | 2 | false
186+
10 | 5 | 0 | 5 | true
187+
10 | 8 | 0 | 5 | false
188+
10 | 0 | 1 | 5 | false
189+
}
190+
191+
def 'should throw error if job array size exceeds queue size' () {
192+
given:
193+
def session = Mock(Session)
194+
def monitor = Spy(new TaskPollingMonitor(name: 'foo', session: session, capacity: 2, pollInterval: Duration.of('1min')))
195+
and:
196+
def arrayHandler = Spy(TaskHandler)
197+
arrayHandler.task = Mock(TaskArrayRun) {
198+
getName() >> 'bar'
199+
getArraySize() >> 4
200+
}
201+
202+
when:
203+
monitor.canSubmit(arrayHandler)
204+
then:
205+
def e = thrown(IllegalArgumentException)
206+
e.message == 'Job array bar exceeds the queue size (array size: 4, queue size: 2)'
207+
}
208+
151209
}

0 commit comments

Comments
 (0)