Skip to content

Commit c584d1b

Browse files
refactor: improve gc
Signed-off-by: Yaroslav Borbat <[email protected]>
1 parent 714a95d commit c584d1b

File tree

9 files changed

+575
-202
lines changed

9 files changed

+575
-202
lines changed

images/virtualization-artifact/pkg/controller/gc/cron_source.go

Lines changed: 50 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -17,166 +17,95 @@ limitations under the License.
1717
package gc
1818

1919
import (
20-
"cmp"
2120
"context"
2221
"fmt"
23-
"slices"
2422
"time"
2523

2624
"github.com/robfig/cron/v3"
27-
"k8s.io/apimachinery/pkg/api/meta"
28-
"k8s.io/apimachinery/pkg/runtime"
2925
"k8s.io/apimachinery/pkg/types"
3026
"k8s.io/client-go/util/workqueue"
3127
ctrl "sigs.k8s.io/controller-runtime"
3228
"sigs.k8s.io/controller-runtime/pkg/client"
33-
"sigs.k8s.io/controller-runtime/pkg/event"
3429
"sigs.k8s.io/controller-runtime/pkg/handler"
3530
"sigs.k8s.io/controller-runtime/pkg/predicate"
3631
"sigs.k8s.io/controller-runtime/pkg/source"
3732

3833
"github.com/deckhouse/deckhouse/pkg/log"
39-
"github.com/deckhouse/virtualization-controller/pkg/common/object"
34+
"k8s.io/utils/clock"
35+
4036
"github.com/deckhouse/virtualization-controller/pkg/logger"
4137
)
4238

4339
var _ source.Source = &CronSource{}
4440

4541
const sourceName = "CronSource"
4642

47-
func NewCronSource(c client.Client,
48-
standardSpec string,
49-
objList client.ObjectList,
50-
option CronSourceOption,
51-
log *log.Logger,
52-
) *CronSource {
53-
return &CronSource{
54-
Client: c,
55-
standardSpec: standardSpec,
56-
objList: objList,
57-
option: option,
58-
log: log.With("WatchSource", sourceName),
59-
}
60-
}
61-
62-
type CronSource struct {
63-
client.Client
64-
standardSpec string
65-
objList client.ObjectList
66-
option CronSourceOption
67-
log *log.Logger
43+
type SourceGCManager interface {
44+
ListForDelete(ctx context.Context) ([]client.Object, error)
6845
}
6946

70-
type CronSourceOption struct {
71-
GetOlder func(objList client.ObjectList) client.ObjectList
72-
}
73-
74-
func NewDefaultCronSourceOption(objs client.ObjectList, ttl time.Duration, log *log.Logger) CronSourceOption {
75-
return CronSourceOption{
76-
GetOlder: DefaultGetOlder(objs, ttl, 10, log),
47+
func NewCronSource(scheduleSpec string, client client.Client, mgr SourceGCManager, log *log.Logger) (*CronSource, error) {
48+
schedule, err := cron.ParseStandard(scheduleSpec)
49+
if err != nil {
50+
return nil, fmt.Errorf("parsing standard spec %q: %w", scheduleSpec, err)
7751
}
78-
}
79-
80-
func DefaultGetOlder(objs client.ObjectList, ttl time.Duration, maxCount int, log *log.Logger) func(objList client.ObjectList) client.ObjectList {
81-
return func(objList client.ObjectList) client.ObjectList {
82-
var expiredItems []runtime.Object
83-
var notExpiredItems []runtime.Object
8452

85-
if err := meta.EachListItem(objList, func(o runtime.Object) error {
86-
obj, ok := o.(client.Object)
87-
if !ok {
88-
return nil
89-
}
90-
if object.GetAge(obj) > ttl {
91-
expiredItems = append(expiredItems, o)
92-
} else {
93-
notExpiredItems = append(notExpiredItems, o)
94-
}
95-
96-
return nil
97-
}); err != nil {
98-
log.Error("failed to populate list", logger.SlogErr(err))
99-
}
100-
101-
if maxCount != 0 && len(notExpiredItems) > maxCount {
102-
slices.SortFunc(notExpiredItems, func(a, b runtime.Object) int {
103-
aObj, _ := a.(client.Object)
104-
bObj, _ := b.(client.Object)
105-
106-
return cmp.Compare(object.GetAge(aObj), object.GetAge(bObj))
107-
})
108-
expiredItems = append(expiredItems, notExpiredItems[maxCount:]...)
109-
}
53+
return &CronSource{
54+
schedule: schedule,
55+
client: client,
56+
mgr: mgr,
57+
log: log.With("WatchSource", sourceName),
58+
clock: clock.RealClock{},
59+
}, nil
60+
}
11061

111-
if err := meta.SetList(objs, expiredItems); err != nil {
112-
log.Error("failed to set list", logger.SlogErr(err))
113-
}
114-
return objs
115-
}
62+
type CronSource struct {
63+
schedule cron.Schedule
64+
client client.Client
65+
mgr SourceGCManager
66+
log *log.Logger
67+
clock clock.Clock
11668
}
11769

118-
func (c *CronSource) Start(ctx context.Context, _ handler.EventHandler, queue workqueue.RateLimitingInterface, predicates ...predicate.Predicate) error {
119-
schedule, err := cron.ParseStandard(c.standardSpec)
120-
if err != nil {
121-
return fmt.Errorf("parsing standard spec %q: %w", c.standardSpec, err)
122-
}
123-
work := func() {
124-
if err = meta.SetList(c.objList, nil); err != nil {
125-
c.log.Error("failed to reset resource list", logger.SlogErr(err))
126-
return
127-
}
128-
if err = c.List(ctx, c.objList); err != nil {
129-
c.log.Error("failed to listing resources", logger.SlogErr(err))
130-
return
131-
}
132-
if meta.LenList(c.objList) == 0 {
133-
c.log.Debug("no resources, skip")
134-
return
135-
}
136-
if c.option.GetOlder != nil {
137-
c.objList = c.option.GetOlder(c.objList)
138-
}
139-
if err = meta.EachListItem(c.objList, func(object runtime.Object) error {
140-
obj, ok := object.(client.Object)
141-
if !ok {
142-
c.log.Error(fmt.Sprintf("%s's type isn't metav1.Object", object.GetObjectKind().GroupVersionKind().String()))
143-
return nil
144-
}
145-
genericEvent := event.GenericEvent{Object: obj}
146-
for _, p := range predicates {
147-
if !p.Generic(genericEvent) {
148-
c.log.Debug(fmt.Sprintf("skip enqueue object %s/%s due to the predicate.", obj.GetNamespace(), obj.GetName()))
149-
return nil
150-
}
151-
}
152-
queue.Add(ctrl.Request{
153-
NamespacedName: types.NamespacedName{
154-
Namespace: obj.GetNamespace(),
155-
Name: obj.GetName(),
156-
},
157-
})
158-
c.log.Debug(fmt.Sprintf("resource %s/%s enqueued", obj.GetNamespace(), obj.GetName()))
159-
return nil
160-
}); err != nil {
161-
c.log.Error("failed to enqueueing resources", logger.SlogErr(err))
162-
return
163-
}
164-
}
165-
ta := nextScheduleTimeDuration(schedule, time.Now())
70+
func (c *CronSource) Start(ctx context.Context, _ handler.EventHandler, queue workqueue.RateLimitingInterface, _ ...predicate.Predicate) error {
71+
ta := nextScheduleTimeDuration(c.schedule, c.clock.Now())
16672
go func() {
16773
for {
16874
select {
16975
case <-ctx.Done():
17076
return
171-
case <-time.After(ta):
172-
work()
173-
ta = nextScheduleTimeDuration(schedule, time.Now())
77+
case <-c.clock.After(ta):
78+
c.addObjects(ctx, queue.Add)
79+
ta = nextScheduleTimeDuration(c.schedule, c.clock.Now())
17480
}
17581
}
17682
}()
17783
return nil
17884
}
17985

86+
func (c *CronSource) addObjects(ctx context.Context, addToQueue func(interface{})) {
87+
objs, err := c.mgr.ListForDelete(ctx)
88+
if err != nil {
89+
c.log.Error("Failed to get ObjectList for delete", logger.SlogErr(err))
90+
return
91+
}
92+
93+
if len(objs) == 0 {
94+
c.log.Debug("No resources, skip")
95+
return
96+
}
97+
98+
for _, obj := range objs {
99+
addToQueue(ctrl.Request{
100+
NamespacedName: types.NamespacedName{
101+
Namespace: obj.GetNamespace(),
102+
Name: obj.GetName(),
103+
},
104+
})
105+
c.log.Debug(fmt.Sprintf("Resource %s/%s enqueued", obj.GetNamespace(), obj.GetName()))
106+
}
107+
}
108+
180109
func nextScheduleTimeDuration(schedule cron.Schedule, now time.Time) time.Duration {
181110
return schedule.Next(now).Sub(now)
182111
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package gc
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
dlog "github.com/deckhouse/deckhouse/pkg/log"
9+
. "github.com/onsi/ginkgo/v2"
10+
. "github.com/onsi/gomega"
11+
apiruntime "k8s.io/apimachinery/pkg/runtime"
12+
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
13+
clock "k8s.io/utils/clock/testing"
14+
ctrl "sigs.k8s.io/controller-runtime"
15+
"sigs.k8s.io/controller-runtime/pkg/client"
16+
"sigs.k8s.io/controller-runtime/pkg/client/fake"
17+
18+
"github.com/deckhouse/virtualization-controller/pkg/common/pwgen"
19+
"github.com/deckhouse/virtualization-controller/pkg/common/testutil"
20+
)
21+
22+
var _ = FDescribe("CronSource", func() {
23+
const (
24+
// Every day at 00:00
25+
scheduleSpec = "0 * * * *"
26+
)
27+
28+
var (
29+
log *dlog.Logger
30+
ctx context.Context
31+
fakeClient client.Client
32+
mgr *fakeGCManager
33+
fakeClock *clock.FakeClock
34+
//queue workqueue.RateLimitingInterface
35+
)
36+
37+
BeforeEach(func() {
38+
log = testutil.NewNoOpLogger()
39+
ctx = testutil.ToContext(context.Background(), log)
40+
41+
scheme := apiruntime.NewScheme()
42+
for _, f := range []func(*apiruntime.Scheme) error{
43+
clientgoscheme.AddToScheme,
44+
AddToScheme,
45+
} {
46+
Expect(f(scheme)).To(Succeed())
47+
}
48+
49+
fakeClient = fake.NewClientBuilder().WithScheme(scheme).Build()
50+
51+
mgr = newFakeGCManager(fakeClient, time.Hour, 10)
52+
fakeClock = clock.NewFakeClock(time.Now())
53+
//queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
54+
})
55+
56+
newSource := func(scheduleSpec string) *CronSource {
57+
source, err := NewCronSource(scheduleSpec, fakeClient, mgr, log)
58+
source.clock = fakeClock
59+
Expect(err).NotTo(HaveOccurred())
60+
return source
61+
}
62+
63+
It("should enqueue 100 objects in completed state and delete them", func() {
64+
source := newSource(scheduleSpec)
65+
66+
spawnFakeObjects(10, 10, fakeObjectPhaseCompleted, fakeClient)
67+
spawnFakeObjects(10, 10, fakeObjectPhasePending, fakeClient)
68+
spawnFakeObjects(10, 10, fakeObjectPhaseRunning, fakeClient)
69+
70+
var enqueued []ctrl.Request
71+
source.addObjects(ctx, func(obj interface{}) {
72+
req := obj.(ctrl.Request)
73+
fakeObj := &FakeObject{}
74+
Expect(fakeClient.Get(ctx, req.NamespacedName, fakeObj)).To(Succeed())
75+
Expect(fakeObj.Phase).To(Equal(fakeObjectPhaseCompleted))
76+
enqueued = append(enqueued, req)
77+
})
78+
Expect(len(enqueued)).To(Equal(100))
79+
80+
reconciler := NewReconciler(fakeClient, source, mgr)
81+
for _, req := range enqueued {
82+
result, err := reconciler.Reconcile(ctx, req)
83+
Expect(err).NotTo(HaveOccurred())
84+
Expect(result.IsZero()).To(BeTrue())
85+
}
86+
87+
objs := &FakeObjectList{}
88+
Expect(fakeClient.List(ctx, objs)).To(Succeed())
89+
Expect(len(objs.Items)).To(Equal(200))
90+
for _, obj := range objs.Items {
91+
Expect(obj.Phase).NotTo(Equal(fakeObjectPhaseCompleted))
92+
}
93+
})
94+
95+
//It("should return the oldest object", func() {
96+
// // Every day at 00:00
97+
// scheduleSpec := "0 * * * *"
98+
// source := newSource(scheduleSpec)
99+
//
100+
// for i := range 2 {
101+
// namespace := fmt.Sprintf("test-namespace-%d", i)
102+
// for j := range 5 {
103+
// obj := NewFakeObject(fmt.Sprintf("fake-%d", j), namespace)
104+
// obj.Phase = fakeObjectPhaseCompleted
105+
//
106+
// Expect(obj.Phase).To(Equal(fakeObjectPhaseCompleted))
107+
// Expect(fakeClient.Create(context.Background(), obj)).To(Succeed())
108+
// }
109+
// }
110+
//
111+
// Expect(source.Start(context.Background(), nil, queue)).To(Succeed())
112+
// Expect(queue.Len()).To(Equal(0))
113+
//
114+
// fakeClock.Step(time.Hour * 25)
115+
//
116+
// Eventually(queue.Len()).WithTimeout(10 * time.Second).Should(Equal(20))
117+
//})
118+
})
119+
120+
func spawnFakeObjects(countNamespaces, countPerNamespace int, phase string, client client.Client) {
121+
for i := 0; i < countNamespaces; i++ {
122+
namespace := fmt.Sprintf("test-namespace-%s-%d", pwgen.AlphaNum(32), i)
123+
for j := 0; j < countPerNamespace; j++ {
124+
obj := NewFakeObject(fmt.Sprintf("fake-%d", j), namespace)
125+
obj.Phase = phase
126+
Expect(client.Create(context.Background(), obj)).To(Succeed())
127+
}
128+
}
129+
}

0 commit comments

Comments
 (0)