Skip to content

Commit 0eee07f

Browse files
committed
Fix rescaling PEs
1 parent 28afba3 commit 0eee07f

File tree

1 file changed

+12
-9
lines changed

1 file changed

+12
-9
lines changed

pkg/controller/mpi_job_controller.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,7 @@ func NewMPIJobControllerWithClock(
468468
oldExpandReplicas: make(map[string]int32),
469469
runningJobs: pqRunning,
470470
queuedJobs: pqQueued,
471-
freeSlots: 60,
471+
freeSlots: 10,
472472
rescaleGap: 1 * time.Second, // 3 minutes
473473
}
474474
// FIXME fix the free slots!
@@ -1534,10 +1534,12 @@ func (c *MPIJobController) calculateWorkerReplicas(mpiJob *kubeflow.MPIJob) (int
15341534
klog.Infof("Queued job 1 %s, %d", getJobKey(mpiJob), numWorkersToFree)
15351535
return 0, jobQueuedError
15361536
} else {
1537-
numWorkersToFree = *worker.MinReplicas - int32(c.freeSlots) + 1
1538-
index = len(c.runningJobs) - 1
1537+
//numWorkersToFree = *worker.MinReplicas - int32(c.freeSlots) + 1
1538+
maxWorkersToFree := *worker.MaxReplicas - int32(c.freeSlots) + 1
1539+
minWorkersToFree := *worker.MinReplicas - int32(c.freeSlots) + 1
1540+
index := len(c.runningJobs) - 1
15391541
for {
1540-
if numWorkersToFree == 0 || index < 0 {
1542+
if maxWorkersToFree == 0 || index < 0 {
15411543
break
15421544
}
15431545

@@ -1561,7 +1563,7 @@ func (c *MPIJobController) calculateWorkerReplicas(mpiJob *kubeflow.MPIJob) (int
15611563
jobMinReplicas := *it.mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].MinReplicas
15621564
if int32(len(workerPodList)) > jobMinReplicas && c.lastAction[getJobKey(&it.mpiJob)].Add(c.rescaleGap).Before(c.clock.Now()) {
15631565
newPodCount := int32(math.Max(float64(jobMinReplicas),
1564-
float64(len(workerPodList)-int(numWorkersToFree))))
1566+
float64(len(workerPodList)-int(maxWorkersToFree))))
15651567
klog.Infof("Setting replicas for %s to %d", getJobKey(&it.mpiJob), newPodCount)
15661568

15671569
err := c.sendRescaleSignal(&it.mpiJob, int32(len(workerPodList)), newPodCount)
@@ -1572,20 +1574,21 @@ func (c *MPIJobController) calculateWorkerReplicas(mpiJob *kubeflow.MPIJob) (int
15721574
}
15731575

15741576
c.latestReplicas[getJobKey(&it.mpiJob)] = newPodCount
1575-
numWorkersToFree -= int32(len(workerPodList) - int(newPodCount))
1577+
maxWorkersToFree -= int32(len(workerPodList) - int(newPodCount))
1578+
minWorkersToFree -= int32(len(workerPodList) - int(newPodCount))
15761579
c.freeSlots += len(workerPodList) - int(newPodCount)
15771580

15781581
c.queue.AddRateLimited(getJobKey(&it.mpiJob))
15791582
}
15801583
}
1581-
if numWorkersToFree > 0 {
1584+
if minWorkersToFree > 0 {
15821585
// queue this job
15831586
//c.enqueueJobInternal(mpiJob)
1584-
klog.Infof("Queued job 2 %s, %d", getJobKey(mpiJob), numWorkersToFree)
1587+
klog.Infof("Queued job 2 %s, %d", getJobKey(mpiJob), minWorkersToFree)
15851588
return 0, jobQueuedError
15861589
}
15871590
}
1588-
return *worker.MinReplicas, nil
1591+
return int32(c.freeSlots) - 1, nil
15891592
} else {
15901593
return replicas, nil
15911594
}

0 commit comments

Comments
 (0)