Skip to content

Commit a45be3c

Browse files
authored
Merge pull request #3200 from corhere/fix-scheduler-placementpref-infinite-loop
manager: fix task scheduler infinite loop
2 parents 8c19597 + 2d6aff7 commit a45be3c

File tree

3 files changed

+318
-1
lines changed

3 files changed

+318
-1
lines changed

manager/scheduler/nodeset_test.go

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package scheduler
2+
3+
import (
4+
"testing"
5+
6+
"github.com/moby/swarmkit/v2/api"
7+
)
8+
9+
func TestTreeTaskCountConsistency(t *testing.T) {
10+
// Create a nodeSet with some test nodes
11+
ns := &nodeSet{nodes: make(map[string]NodeInfo)}
12+
13+
// Add test nodes with different labels and task counts
14+
nodes := []NodeInfo{
15+
{
16+
Node: &api.Node{
17+
ID: "node1",
18+
Spec: api.NodeSpec{
19+
Annotations: api.Annotations{
20+
Labels: map[string]string{"datacenter": "dc1", "rack": "r1"},
21+
},
22+
},
23+
},
24+
ActiveTasksCountByService: map[string]int{"service1": 3},
25+
},
26+
{
27+
Node: &api.Node{
28+
ID: "node2",
29+
Spec: api.NodeSpec{
30+
Annotations: api.Annotations{
31+
Labels: map[string]string{"datacenter": "dc1", "rack": "r2"},
32+
},
33+
},
34+
},
35+
ActiveTasksCountByService: map[string]int{"service1": 2},
36+
},
37+
{
38+
Node: &api.Node{
39+
ID: "node3",
40+
Spec: api.NodeSpec{
41+
Annotations: api.Annotations{
42+
Labels: map[string]string{"datacenter": "dc2", "rack": "r2"},
43+
},
44+
},
45+
},
46+
ActiveTasksCountByService: map[string]int{"service1": 4},
47+
},
48+
{
49+
Node: &api.Node{
50+
ID: "node4",
51+
Spec: api.NodeSpec{
52+
Annotations: api.Annotations{
53+
Labels: map[string]string{}, // no label
54+
},
55+
},
56+
},
57+
ActiveTasksCountByService: map[string]int{"service1": 2},
58+
},
59+
{
60+
Node: &api.Node{
61+
ID: "node5",
62+
Spec: api.NodeSpec{
63+
Annotations: api.Annotations{
64+
Labels: map[string]string{}, // no label
65+
},
66+
},
67+
},
68+
ActiveTasksCountByService: map[string]int{"service1": 1},
69+
},
70+
}
71+
72+
for _, node := range nodes {
73+
ns.addOrUpdateNode(node)
74+
}
75+
76+
preferences := []*api.PlacementPreference{
77+
{
78+
Preference: &api.PlacementPreference_Spread{
79+
Spread: &api.SpreadOver{
80+
SpreadDescriptor: "node.labels.datacenter",
81+
},
82+
},
83+
},
84+
{
85+
Preference: &api.PlacementPreference_Spread{
86+
Spread: &api.SpreadOver{
87+
SpreadDescriptor: "node.labels.rack",
88+
},
89+
},
90+
},
91+
}
92+
93+
// Create the tree
94+
tree := ns.tree("service1", preferences, 10,
95+
func(*NodeInfo) bool { return true },
96+
func(a, b *NodeInfo) bool { return true })
97+
98+
// Helper function to verify task count consistency recursively
99+
var verifyTaskCounts func(*testing.T, *decisionTree) int
100+
verifyTaskCounts = func(t *testing.T, dt *decisionTree) int {
101+
if dt == nil {
102+
return 0
103+
}
104+
105+
if dt.next == nil {
106+
return dt.tasks
107+
}
108+
109+
// Calculate sum of children's tasks
110+
childrenSum := 0
111+
for _, child := range dt.next {
112+
childrenSum += verifyTaskCounts(t, child)
113+
}
114+
115+
// Verify parent's task count equals sum of children
116+
if dt.tasks != childrenSum {
117+
t.Errorf("Parent task count (%d) does not equal sum of children (%d)",
118+
dt.tasks, childrenSum)
119+
}
120+
121+
return dt.tasks
122+
}
123+
124+
// Run the verification
125+
verifyTaskCounts(t, &tree)
126+
127+
// Verify specific expected values
128+
if tree.tasks != 12 { // Total tasks: 3 + 2 + 4 + 2 + 1 = 12
129+
t.Errorf("Expected root to have 12 tasks, got %d", tree.tasks)
130+
}
131+
132+
dc1Tasks := tree.next["dc1"].tasks
133+
if dc1Tasks != 5 { // dc1 tasks: 3 + 2 = 5
134+
t.Errorf("Expected dc1 to have 5 tasks, got %d", dc1Tasks)
135+
}
136+
dc1r1Tasks := tree.next["dc1"].next["r1"].tasks
137+
if dc1r1Tasks != 3 {
138+
t.Errorf("Expected dc1 r1 to have 3 tasks, got %d", dc1r1Tasks)
139+
}
140+
dc1r2Tasks := tree.next["dc1"].next["r2"].tasks
141+
if dc1r2Tasks != 2 {
142+
t.Errorf("Expected dc1 r1 to have 2 tasks, got %d", dc1r2Tasks)
143+
}
144+
145+
dc2Tasks := tree.next["dc2"].tasks
146+
if dc2Tasks != 4 { // dc2 tasks: 4
147+
t.Errorf("Expected dc2 to have 4 tasks, got %d", dc2Tasks)
148+
}
149+
dc2r2Tasks := tree.next["dc2"].next["r2"].tasks
150+
if dc2r2Tasks != 4 {
151+
t.Errorf("Expected dc1 r1 to have 4 tasks, got %d", dc1r2Tasks)
152+
}
153+
154+
otherTasks := tree.next[""].tasks
155+
if otherTasks != 3 {
156+
t.Errorf("Expected others to have 3 tasks, got %d", otherTasks)
157+
}
158+
subOtherTasks := tree.next[""].next[""].tasks
159+
if subOtherTasks != 3 {
160+
t.Errorf("Expected sub-others to have 3 tasks, got %d", subOtherTasks)
161+
}
162+
163+
}

manager/scheduler/scheduler.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -787,9 +787,11 @@ func (s *Scheduler) scheduleNTasksOnSubtree(ctx context.Context, n int, taskGrou
787787

788788
// Try to make branches even until either all branches are
789789
// full, or all tasks have been scheduled.
790-
for tasksScheduled != n && len(noRoom) != len(tree.next) {
790+
converging := true
791+
for tasksScheduled != n && len(noRoom) != len(tree.next) && converging {
791792
desiredTasksPerBranch := (tasksInUsableBranches + n - tasksScheduled) / (len(tree.next) - len(noRoom))
792793
remainder := (tasksInUsableBranches + n - tasksScheduled) % (len(tree.next) - len(noRoom))
794+
converging = false
793795

794796
for _, subtree := range tree.next {
795797
if noRoom != nil {
@@ -799,6 +801,7 @@ func (s *Scheduler) scheduleNTasksOnSubtree(ctx context.Context, n int, taskGrou
799801
}
800802
subtreeTasks := subtree.tasks
801803
if subtreeTasks < desiredTasksPerBranch || (subtreeTasks == desiredTasksPerBranch && remainder > 0) {
804+
converging = true
802805
tasksToAssign := desiredTasksPerBranch - subtreeTasks
803806
if remainder > 0 {
804807
tasksToAssign++

manager/scheduler/scheduler_test.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1110,6 +1110,156 @@ func TestMultiplePreferences(t *testing.T) {
11101110
t.Run("useSpecVersion=true", func(t *testing.T) { testMultiplePreferences(t, true) })
11111111
}
11121112

1113+
// TestMultiplePreferencesScaleUp is a regression test for an infinite loop
1114+
// bug in the scheduler.
1115+
func TestMultiplePreferencesScaleUp(t *testing.T) {
1116+
ctx := context.Background()
1117+
initialNodeSet := []*api.Node{
1118+
{
1119+
ID: "id11",
1120+
Status: api.NodeStatus{
1121+
State: api.NodeStatus_READY,
1122+
},
1123+
Spec: api.NodeSpec{
1124+
Annotations: api.Annotations{
1125+
Labels: map[string]string{
1126+
"az": "dc1",
1127+
"rack": "r1",
1128+
},
1129+
},
1130+
},
1131+
},
1132+
{
1133+
ID: "id12",
1134+
Status: api.NodeStatus{
1135+
State: api.NodeStatus_READY,
1136+
},
1137+
Spec: api.NodeSpec{
1138+
Annotations: api.Annotations{
1139+
Labels: map[string]string{
1140+
"az": "dc1",
1141+
"rack": "r2",
1142+
},
1143+
},
1144+
},
1145+
},
1146+
{
1147+
ID: "id21",
1148+
Status: api.NodeStatus{
1149+
State: api.NodeStatus_READY,
1150+
},
1151+
Spec: api.NodeSpec{
1152+
Annotations: api.Annotations{
1153+
Labels: map[string]string{
1154+
"az": "dc2",
1155+
"rack": "r1",
1156+
},
1157+
},
1158+
},
1159+
},
1160+
}
1161+
1162+
taskTemplate1 := &api.Task{
1163+
DesiredState: api.TaskStateRunning,
1164+
ServiceID: "service1",
1165+
// The service needs to have a spec version to be scheduled as a
1166+
// group, a necessary precondition for the scheduler
1167+
// infinite-loop bug.
1168+
SpecVersion: &api.Version{Index: 1},
1169+
Spec: api.TaskSpec{
1170+
Runtime: &api.TaskSpec_Container{
1171+
Container: &api.ContainerSpec{
1172+
Image: "v:1",
1173+
},
1174+
},
1175+
Placement: &api.Placement{
1176+
Preferences: []*api.PlacementPreference{
1177+
{
1178+
Preference: &api.PlacementPreference_Spread{
1179+
Spread: &api.SpreadOver{
1180+
SpreadDescriptor: "node.labels.az",
1181+
},
1182+
},
1183+
},
1184+
{
1185+
Preference: &api.PlacementPreference_Spread{
1186+
Spread: &api.SpreadOver{
1187+
SpreadDescriptor: "node.labels.rack",
1188+
},
1189+
},
1190+
},
1191+
},
1192+
},
1193+
},
1194+
Status: api.TaskStatus{
1195+
State: api.TaskStatePending,
1196+
},
1197+
}
1198+
1199+
s := store.NewMemoryStore(nil)
1200+
assert.NotNil(t, s)
1201+
defer s.Close()
1202+
1203+
t1Instances := 2
1204+
1205+
err := s.Update(func(tx store.Tx) error {
1206+
// Prepoulate nodes
1207+
for _, n := range initialNodeSet {
1208+
assert.NoError(t, store.CreateNode(tx, n))
1209+
}
1210+
1211+
// Prepopulate tasks from template 1
1212+
for i := 0; i != t1Instances; i++ {
1213+
taskTemplate1.ID = fmt.Sprintf("t1id%d", i)
1214+
assert.NoError(t, store.CreateTask(tx, taskTemplate1))
1215+
}
1216+
1217+
// Populate some running tasks to simulate a service scaling scenario
1218+
for node, tasks := range map[string]int{
1219+
"id11": 3,
1220+
"id12": 1,
1221+
"id21": 3,
1222+
} {
1223+
for i := 0; i != tasks; i++ {
1224+
taskTemplate1.ID = fmt.Sprintf("t1running-%s-%d", node, i)
1225+
taskTemplate1.NodeID = node
1226+
taskTemplate1.Status.State = api.TaskStateRunning
1227+
assert.NoError(t, store.CreateTask(tx, taskTemplate1))
1228+
}
1229+
}
1230+
return nil
1231+
})
1232+
assert.NoError(t, err)
1233+
1234+
scheduler := New(s)
1235+
1236+
watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
1237+
defer cancel()
1238+
1239+
go func() {
1240+
assert.NoError(t, scheduler.Run(ctx))
1241+
}()
1242+
defer scheduler.Stop()
1243+
1244+
t1Assignments := make(map[string]int)
1245+
totalAssignments := 0
1246+
for i := 0; i != t1Instances; i++ {
1247+
assignment := watchAssignment(t, watch)
1248+
if !strings.HasPrefix(assignment.ID, "t1") {
1249+
t.Fatal("got assignment for different kind of task")
1250+
}
1251+
t1Assignments[assignment.NodeID]++
1252+
totalAssignments++
1253+
}
1254+
1255+
t.Logf("t1Assignments: %#v", t1Assignments)
1256+
assert.Equal(t, t1Instances, totalAssignments)
1257+
// It would be valid for the scheduler either assign the tasks to id12,
1258+
// which balances r1 and r2 of dc1, or assign the tasks to id21, which
1259+
// balances dc1 and dc2.
1260+
assert.Equal(t, 2, t1Assignments["id12"]+t1Assignments["id21"])
1261+
}
1262+
11131263
func TestSchedulerNoReadyNodes(t *testing.T) {
11141264
ctx := context.Background()
11151265
initialTask := &api.Task{
@@ -2698,6 +2848,7 @@ func watchAssignmentFailure(t *testing.T, watch chan events.Event) *api.Task {
26982848
}
26992849

27002850
func watchAssignment(t *testing.T, watch chan events.Event) *api.Task {
2851+
t.Helper()
27012852
for {
27022853
select {
27032854
case event := <-watch:

0 commit comments

Comments
 (0)