Skip to content

Commit 6c9bafc

Browse files
committed
feat: implement metrics provider
1 parent 30e9194 commit 6c9bafc

File tree

2 files changed

+195
-15
lines changed

2 files changed

+195
-15
lines changed
Lines changed: 83 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,98 @@
11
package autoscaler
22

3-
import "time"
3+
import (
4+
"time"
5+
6+
"github.com/NexusGPU/tensor-fusion/internal/metrics"
7+
"gorm.io/gorm"
8+
)
49

510
type WorkerMetrics struct {
6-
Workload string
7-
Worker string
8-
TflopsUsage ResourceAmount
9-
VramUsage ResourceAmount
10-
Timestamp time.Time
11+
WorkloadName string
12+
WorkerName string
13+
TflopsUsage ResourceAmount
14+
VramUsage ResourceAmount
15+
Timestamp time.Time
1116
}
1217

1318
type MetricsProvider interface {
14-
GetWorkersMetrics() []*WorkerMetrics
15-
GetHistoryMetrics() []*WorkerMetrics
19+
GetWorkersMetrics() ([]*WorkerMetrics, error)
20+
GetHistoryMetrics() ([]*WorkerMetrics, error)
21+
}
22+
23+
func NewMetricsProvider(db *gorm.DB) MetricsProvider {
24+
return &greptimeDBProvider{db: db}
1625
}
1726

18-
func NewMetricsProvider() MetricsProvider {
19-
return &GreptimeDBProvider{}
27+
type greptimeDBProvider struct {
28+
db *gorm.DB
29+
lastQueryTime time.Time
30+
historyDuration time.Duration
2031
}
2132

22-
type GreptimeDBProvider struct{}
33+
func (g *greptimeDBProvider) GetWorkersMetrics() ([]*WorkerMetrics, error) {
34+
data := []*metrics.HypervisorWorkerUsageMetrics{}
35+
now := time.Now()
36+
// actual meaning: max(avg[10s])[1m]
37+
err := g.db.Select("workload, worker, max(compute_tflops) as compute_tflops, max(memory_bytes) as memory_bytes, max(ts) as ts").
38+
Where("ts > ? and ts <= ?", g.lastQueryTime.Nanosecond(), now.Nanosecond()).
39+
Group("workload, worker").
40+
Order("ts asc").
41+
Error
42+
43+
if err != nil {
44+
return nil, err
45+
}
46+
47+
g.lastQueryTime = now
48+
49+
workersMetrics := make([]*WorkerMetrics, 0, len(data))
50+
for _, row := range data {
51+
workersMetrics = append(workersMetrics, &WorkerMetrics{
52+
WorkloadName: row.WorkloadName,
53+
WorkerName: row.WorkerName,
54+
TflopsUsage: resourceAmountFromFloat(row.ComputeTflops),
55+
VramUsage: ResourceAmount(row.VRAMBytes),
56+
Timestamp: row.Timestamp,
57+
})
58+
}
2359

24-
func (*GreptimeDBProvider) GetWorkersMetrics() []*WorkerMetrics {
25-
panic("unimplemented")
60+
return workersMetrics, nil
2661
}
2762

28-
func (*GreptimeDBProvider) GetHistoryMetrics() []*WorkerMetrics {
29-
panic("unimplemented")
63+
type hypervisorWorkerUsageMetrics struct {
64+
metrics.HypervisorWorkerUsageMetrics
65+
TimeWindow time.Time `gorm:"column:time_window;index:,class:TIME"`
66+
}
67+
68+
func (g *greptimeDBProvider) GetHistoryMetrics() ([]*WorkerMetrics, error) {
69+
data := []*hypervisorWorkerUsageMetrics{}
70+
now := time.Now()
71+
// TODO: replace using iteration for handling large datasets efficiently
72+
// TODO: supply history resolution to config time window
73+
err := g.db.Select("workload, worker, max(compute_tflops) as compute_tflops, max(memory_bytes) as memory_bytes, date_bin('1 minute'::INTERVAL, ts) as time_window").
74+
Where("ts > ? and ts <= ?", now.Add(-g.historyDuration), now.Nanosecond()).
75+
Group("workload, worker, time_window").
76+
Order("time_window asc").
77+
Find(&data).
78+
Error
79+
80+
if err != nil {
81+
return nil, err
82+
}
83+
84+
g.lastQueryTime = now
85+
86+
workersMetrics := make([]*WorkerMetrics, 0, len(data))
87+
for _, row := range data {
88+
workersMetrics = append(workersMetrics, &WorkerMetrics{
89+
WorkloadName: row.WorkloadName,
90+
WorkerName: row.WorkerName,
91+
TflopsUsage: resourceAmountFromFloat(row.ComputeTflops),
92+
VramUsage: ResourceAmount(row.VRAMBytes),
93+
Timestamp: row.TimeWindow,
94+
})
95+
}
96+
97+
return workersMetrics, nil
3098
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package autoscaler
2+
3+
import (
4+
"regexp"
5+
"time"
6+
7+
"github.com/DATA-DOG/go-sqlmock"
8+
"github.com/NexusGPU/tensor-fusion/internal/metrics"
9+
. "github.com/onsi/ginkgo/v2"
10+
. "github.com/onsi/gomega"
11+
"gorm.io/driver/mysql"
12+
"gorm.io/gorm"
13+
)
14+
15+
var _ = Describe("MetricsProvider", func() {
16+
Context("when getting real time workers metrics", func() {
17+
It("should return slices", func() {
18+
db, mock := NewMockDB()
19+
now := time.Now()
20+
fakeMetrics := []metrics.HypervisorWorkerUsageMetrics{
21+
{
22+
WorkloadName: "workload-0",
23+
WorkerName: "worker-0",
24+
ComputeTflops: 10.3,
25+
VRAMBytes: 1 * 1000 * 1000 * 1000,
26+
Timestamp: now,
27+
},
28+
{
29+
WorkloadName: "workload-1",
30+
WorkerName: "worker-1",
31+
ComputeTflops: 10.3,
32+
VRAMBytes: 1 * 1000 * 1000 * 1000,
33+
Timestamp: now,
34+
},
35+
}
36+
37+
rows := sqlmock.NewRows([]string{"workload", "worker", "compute_tflops", "memory_bytes", "ts"})
38+
for _, row := range fakeMetrics {
39+
rows.AddRow(row.WorkloadName, row.WorkerName, row.ComputeTflops, row.VRAMBytes, row.Timestamp)
40+
}
41+
42+
mock.ExpectQuery(regexp.QuoteMeta("SELECT workload, worker, max(compute_tflops) as compute_tflops, max(memory_bytes) as memory_bytes, max(ts) as ts FROM `tf_worker_usage` WHERE ts > ? GROUP BY workload, worker")).
43+
WillReturnRows(rows)
44+
provider := &greptimeDBProvider{db: db}
45+
got, _ := provider.GetWorkersMetrics()
46+
Expect(got).To(HaveLen(2))
47+
Expect(got[0].WorkloadName).To(Equal(fakeMetrics[0].WorkloadName))
48+
Expect(got[0].WorkerName).To(Equal(fakeMetrics[0].WorkerName))
49+
Expect(got[0].VramUsage).To(Equal(ResourceAmount(fakeMetrics[0].VRAMBytes)))
50+
Expect(got[0].TflopsUsage).To(Equal(resourceAmountFromFloat(fakeMetrics[0].ComputeTflops)))
51+
Expect(got[0].Timestamp).To(Equal(fakeMetrics[0].Timestamp))
52+
})
53+
})
54+
55+
Context("when getting history workers metrics", func() {
56+
FIt("should return slices", func() {
57+
db, mock := NewMockDB()
58+
now := time.Now()
59+
fakeMetrics := []hypervisorWorkerUsageMetrics{
60+
{
61+
HypervisorWorkerUsageMetrics: metrics.HypervisorWorkerUsageMetrics{
62+
WorkloadName: "workload-0",
63+
WorkerName: "worker-0",
64+
ComputeTflops: 10.3,
65+
VRAMBytes: 1 * 1000 * 1000 * 1000,
66+
Timestamp: now,
67+
},
68+
TimeWindow: now,
69+
},
70+
{
71+
HypervisorWorkerUsageMetrics: metrics.HypervisorWorkerUsageMetrics{
72+
WorkloadName: "workload-1",
73+
WorkerName: "worker-1",
74+
ComputeTflops: 10.3,
75+
VRAMBytes: 1 * 1000 * 1000 * 1000,
76+
Timestamp: now,
77+
},
78+
TimeWindow: now,
79+
},
80+
}
81+
82+
rows := sqlmock.NewRows([]string{"workload", "worker", "compute_tflops", "memory_bytes", "time_window"})
83+
for _, row := range fakeMetrics {
84+
rows.AddRow(row.WorkloadName, row.WorkerName, row.ComputeTflops, row.VRAMBytes, row.TimeWindow)
85+
}
86+
87+
mock.ExpectQuery(regexp.QuoteMeta("SELECT workload, worker, max(compute_tflops) as compute_tflops, max(memory_bytes) as memory_bytes, date_bin('1 minute'::INTERVAL, ts) as time_window FROM `tf_worker_usage` WHERE ts > ? and ts <= ? GROUP BY workload, worker, time_window ORDER BY time_window asc")).
88+
WillReturnRows(rows)
89+
provider := &greptimeDBProvider{db: db}
90+
got, _ := provider.GetHistoryMetrics()
91+
Expect(got).To(HaveLen(2))
92+
Expect(got[0].WorkloadName).To(Equal(fakeMetrics[0].WorkloadName))
93+
Expect(got[0].WorkerName).To(Equal(fakeMetrics[0].WorkerName))
94+
Expect(got[0].VramUsage).To(Equal(ResourceAmount(fakeMetrics[0].VRAMBytes)))
95+
Expect(got[0].TflopsUsage).To(Equal(resourceAmountFromFloat(fakeMetrics[0].ComputeTflops)))
96+
Expect(got[0].Timestamp).To(Equal(fakeMetrics[0].TimeWindow))
97+
})
98+
})
99+
})
100+
101+
func NewMockDB() (*gorm.DB, sqlmock.Sqlmock) {
102+
GinkgoHelper()
103+
db, mock, err := sqlmock.New()
104+
Expect(err).ToNot(HaveOccurred())
105+
gormDB, err := gorm.Open(mysql.New(mysql.Config{
106+
Conn: db,
107+
SkipInitializeWithVersion: true,
108+
}), &gorm.Config{})
109+
Expect(err).ToNot(HaveOccurred())
110+
111+
return gormDB, mock
112+
}

0 commit comments

Comments
 (0)