Skip to content

Commit 30e9194

Browse files
committed
feat: aggregate samples into histogram per tflops
1 parent 7122a19 commit 30e9194

File tree

2 files changed

+25
-43
lines changed

2 files changed

+25
-43
lines changed

internal/autoscaler/autoscaler_test.go

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,15 @@ import (
3333
"sigs.k8s.io/controller-runtime/pkg/client"
3434
)
3535

36+
// tflops add all samples, like cpu in vpa
37+
// Consider gpu allocator, check if enough tflops or vram to allocate
38+
// cron scheduler stragegy
39+
// Add AutoSetResources to schedulingconfigtemplate and make it more configurable
40+
// refactor main, setup database may not put in leader election runnable group
41+
// scale to zero when query data if no usage, need carl to support
42+
// add recommendation to workload
43+
// resolve conversation on github, thanks for reviews
44+
3645
var _ = Describe("Autoscaler", func() {
3746
Context("when creating an autoscaler", func() {
3847
It("should return an error if there is no client", func() {
@@ -49,8 +58,8 @@ var _ = Describe("Autoscaler", func() {
4958
scaler.LoadHistoryMetrics(ctx)
5059
metrics := scaler.MetricsProvider.GetHistoryMetrics()
5160
for _, m := range metrics {
52-
Expect(scaler.WorkloadStates).To(HaveKey(m.Workload))
53-
Expect(scaler.WorkerStates).To(HaveKey(m.Worker))
61+
Expect(scaler.WorkloadStates).To(HaveKey(m.WorkloadName))
62+
Expect(scaler.WorkerStates).To(HaveKey(m.WorkerName))
5463
}
5564
})
5665
})
@@ -101,7 +110,7 @@ var _ = Describe("Autoscaler", func() {
101110
})
102111

103112
Context("when loading real time metrics", func() {
104-
It("should update the state of workloads and workers", func() {
113+
FIt("should update the state of workloads and workers", func() {
105114
tfEnv := NewTensorFusionEnvBuilder().
106115
AddPoolWithNodeCount(1).SetGpuCountPerNode(1).
107116
Build()
@@ -117,17 +126,16 @@ var _ = Describe("Autoscaler", func() {
117126
scaler.LoadWorkloads(ctx)
118127
ws := scaler.WorkloadStates[workload.Name]
119128
metrics := &WorkerMetrics{
120-
Workload: workload.Name,
121-
Worker: worker,
122-
TflopsUsage: ResourceAmount(12.0),
123-
VramUsage: 9000,
124-
Timestamp: time.Now(),
129+
WorkloadName: workload.Name,
130+
WorkerName: worker,
131+
TflopsUsage: ResourceAmount(12.0),
132+
VramUsage: 9000,
133+
Timestamp: time.Now(),
125134
}
126135

127136
scaler.MetricsProvider = &FakeMetricsProvider{[]*WorkerMetrics{metrics}}
128137
scaler.LoadRealTimeMetrics(ctx)
129138

130-
Expect(scaler.WorkerStates[worker].TflopsPeak).To(Equal(metrics.TflopsUsage))
131139
Expect(scaler.WorkerStates[worker].LastTflopsSampleTime).To(Equal(metrics.Timestamp))
132140
Expect(ws.TflopsHistogram.IsEmpty()).To(BeFalse())
133141
Expect(scaler.WorkerStates[worker].VramPeak).To(Equal(metrics.VramUsage))
@@ -302,11 +310,11 @@ func (f *FakeMetricsProvider) GetHistoryMetrics() []*WorkerMetrics {
302310
for hour := 0; hour < 24; hour++ {
303311
idx := day*24 + hour
304312
metrics = append(metrics, &WorkerMetrics{
305-
Workload: "workload-0",
306-
Worker: fmt.Sprintf("worker-%d", idx),
307-
TflopsUsage: ResourceAmount(10.0 + float64(idx%10)),
308-
VramUsage: 1 * 1024 * 1024 * 1024,
309-
Timestamp: startTime.Add(time.Duration(day*24+hour) * time.Hour),
313+
WorkloadName: "workload-0",
314+
WorkerName: fmt.Sprintf("worker-%d", idx),
315+
TflopsUsage: ResourceAmount(10.0 + float64(idx%10)),
316+
VramUsage: 1 * 1024 * 1024 * 1024,
317+
Timestamp: startTime.Add(time.Duration(day*24+hour) * time.Hour),
310318
})
311319
}
312320
}

internal/autoscaler/workerstate.go

Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@ import (
77
type WorkerState struct {
88
Name string
99
Workload string
10-
TflopsPeak ResourceAmount
1110
LastTflopsSampleTime time.Time
12-
TflopsWindowEnd time.Time
1311

1412
VramPeak ResourceAmount
1513
LastVramSampleTime time.Time
@@ -21,41 +19,17 @@ func NewWorkerState(name string, workload string) *WorkerState {
2119
Name: name,
2220
Workload: workload,
2321
LastTflopsSampleTime: time.Time{},
24-
TflopsWindowEnd: time.Time{},
2522
LastVramSampleTime: time.Time{},
2623
VramWindowEnd: time.Time{},
2724
}
2825
}
2926

3027
func (w *WorkerState) AddTflopsSample(workload *WorkloadState, metrics *WorkerMetrics) bool {
31-
ts := metrics.Timestamp
32-
if ts.Before(w.LastTflopsSampleTime) {
28+
if metrics.Timestamp.Before(w.LastTflopsSampleTime) {
3329
return false
3430
}
35-
w.LastTflopsSampleTime = ts
36-
if w.TflopsWindowEnd.IsZero() {
37-
w.TflopsWindowEnd = ts
38-
}
39-
40-
addNewPeak := false
41-
if ts.Before(w.TflopsWindowEnd) {
42-
if w.TflopsPeak != 0 && metrics.TflopsUsage > w.TflopsPeak {
43-
workload.TflopsHistogram.SubtractSample(float64(w.TflopsPeak), 1.0, w.TflopsWindowEnd)
44-
addNewPeak = true
45-
}
46-
} else {
47-
aggregationInteval := DefaultAggregationInterval
48-
shift := ts.Sub(w.TflopsWindowEnd).Truncate(aggregationInteval) + aggregationInteval
49-
w.TflopsWindowEnd = w.TflopsWindowEnd.Add(shift)
50-
w.TflopsPeak = 0
51-
addNewPeak = true
52-
}
53-
54-
if addNewPeak {
55-
workload.TflopsHistogram.AddSample(float64(metrics.TflopsUsage), 1.0, metrics.Timestamp)
56-
w.TflopsPeak = metrics.TflopsUsage
57-
}
58-
31+
workload.TflopsHistogram.AddSample(float64(metrics.TflopsUsage), minSampleWeight, metrics.Timestamp)
32+
w.LastTflopsSampleTime = metrics.Timestamp
5933
return true
6034
}
6135

0 commit comments

Comments
 (0)