Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 47 additions & 114 deletions images/virtualization-artifact/pkg/controller/gc/cron_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,157 +17,90 @@ limitations under the License.
package gc

import (
"cmp"
"context"
"fmt"
"slices"
"time"

"github.com/robfig/cron/v3"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/deckhouse/deckhouse/pkg/log"
"github.com/deckhouse/virtualization-controller/pkg/common/object"
"github.com/deckhouse/virtualization-controller/pkg/logger"
)

const sourceName = "CronSource"

func NewCronSource(c client.Client,
standardSpec string,
objList client.ObjectList,
option CronSourceOption,
log *log.Logger,
) *CronSource {
return &CronSource{
Client: c,
standardSpec: standardSpec,
objList: objList,
option: option,
log: log.With("WatchSource", sourceName),
}
}

var _ source.Source = &CronSource{}

type CronSource struct {
client.Client
standardSpec string
objList client.ObjectList
option CronSourceOption
log *log.Logger
}
const sourceName = "CronSource"

type CronSourceOption struct {
GetOlder func(objList client.ObjectList) client.ObjectList
type SourceGCManager interface {
ListForDelete(ctx context.Context, now time.Time) ([]client.Object, error)
}

func NewDefaultCronSourceOption(objs client.ObjectList, ttl time.Duration, log *log.Logger) CronSourceOption {
return CronSourceOption{
GetOlder: DefaultGetOlder(objs, ttl, 10, log),
func NewCronSource(scheduleSpec string, mgr SourceGCManager, log *log.Logger) (*CronSource, error) {
schedule, err := cron.ParseStandard(scheduleSpec)
if err != nil {
return nil, fmt.Errorf("parsing standard spec %q: %w", scheduleSpec, err)
}
}

func DefaultGetOlder(objs client.ObjectList, ttl time.Duration, maxCount int, log *log.Logger) func(objList client.ObjectList) client.ObjectList {
return func(objList client.ObjectList) client.ObjectList {
var expiredItems []runtime.Object
var notExpiredItems []runtime.Object

if err := meta.EachListItem(objList, func(o runtime.Object) error {
obj, ok := o.(client.Object)
if !ok {
return nil
}
if object.GetAge(obj) > ttl {
expiredItems = append(expiredItems, o)
} else {
notExpiredItems = append(notExpiredItems, o)
}

return nil
}); err != nil {
log.Error("failed to populate list", logger.SlogErr(err))
}

if maxCount != 0 && len(notExpiredItems) > maxCount {
slices.SortFunc(notExpiredItems, func(a, b runtime.Object) int {
aObj, _ := a.(client.Object)
bObj, _ := b.(client.Object)

return cmp.Compare(object.GetAge(aObj), object.GetAge(bObj))
})
expiredItems = append(expiredItems, notExpiredItems[maxCount:]...)
}
return &CronSource{
schedule: schedule,
mgr: mgr,
log: log.With("WatchSource", sourceName),
clock: &clock.RealClock{},
}, nil
}

if err := meta.SetList(objs, expiredItems); err != nil {
log.Error("failed to set list", logger.SlogErr(err))
}
return objs
}
type CronSource struct {
schedule cron.Schedule
mgr SourceGCManager
log *log.Logger
clock clock.Clock
}

func (c *CronSource) Start(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
schedule, err := cron.ParseStandard(c.standardSpec)
if err != nil {
return fmt.Errorf("parsing standard spec %q: %w", c.standardSpec, err)
}
work := func() {
if err = meta.SetList(c.objList, nil); err != nil {
c.log.Error("failed to reset resource list", logger.SlogErr(err))
return
}
if err = c.List(ctx, c.objList); err != nil {
c.log.Error("failed to listing resources", logger.SlogErr(err))
return
}
if meta.LenList(c.objList) == 0 {
c.log.Debug("no resources, skip")
return
}
if c.option.GetOlder != nil {
c.objList = c.option.GetOlder(c.objList)
}
if err = meta.EachListItem(c.objList, func(object runtime.Object) error {
obj, ok := object.(client.Object)
if !ok {
c.log.Error(fmt.Sprintf("%s's type isn't metav1.Object", object.GetObjectKind().GroupVersionKind().String()))
return nil
}

queue.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
},
})
c.log.Debug(fmt.Sprintf("resource %s/%s enqueued", obj.GetNamespace(), obj.GetName()))
return nil
}); err != nil {
c.log.Error("failed to enqueueing resources", logger.SlogErr(err))
return
}
}
ta := nextScheduleTimeDuration(schedule, time.Now())
nextTime := nextScheduleTimeDuration(c.schedule, c.clock.Now())
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(ta):
work()
ta = nextScheduleTimeDuration(schedule, time.Now())
case <-c.clock.After(nextTime):
c.addObjects(ctx, queue.Add)
nextTime = nextScheduleTimeDuration(c.schedule, c.clock.Now())
}
}
}()
return nil
}

func (c *CronSource) addObjects(ctx context.Context, addToQueue func(reconcile.Request)) {
objs, err := c.mgr.ListForDelete(ctx, c.clock.Now())
if err != nil {
c.log.Error("Failed to get ObjectList for delete", logger.SlogErr(err))
return
}

if len(objs) == 0 {
c.log.Debug("No resources, skip")
return
}

for _, obj := range objs {
addToQueue(reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
},
})
c.log.Debug(fmt.Sprintf("Resource %s/%s enqueued", obj.GetNamespace(), obj.GetName()))
}
}

func nextScheduleTimeDuration(schedule cron.Schedule, now time.Time) time.Duration {
return schedule.Next(now).Sub(now)
}
137 changes: 137 additions & 0 deletions images/virtualization-artifact/pkg/controller/gc/cron_source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
Copyright 2025 Flant JSC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gc

import (
"context"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
clock "k8s.io/utils/clock/testing"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

dlog "github.com/deckhouse/deckhouse/pkg/log"
"github.com/deckhouse/virtualization-controller/pkg/common/testutil"
)

var _ = Describe("CronSource", func() {
const (
// Every day a 00:00
scheduleSpec = "0 0 * * *"
)

var (
log *dlog.Logger
baseCtx context.Context
fakeClient client.Client
mgr *fakeGCManager
fakeClock *clock.FakeClock
)

BeforeEach(func() {
log = testutil.NewNoOpLogger()
baseCtx = testutil.ToContext(context.Background(), log)

scheme := newScheme()
fakeClient = fake.NewClientBuilder().WithScheme(scheme).Build()

mgr = newFakeGCManager(fakeClient, 24*time.Hour, 10)

t := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC)
fakeClock = clock.NewFakeClock(t)
})

newSource := func(scheduleSpec string) *CronSource {
source, err := NewCronSource(scheduleSpec, mgr, log)
Expect(err).NotTo(HaveOccurred())
source.clock = fakeClock
return source
}

Context("with spawned objects", func() {
var (
ctx context.Context
cancel context.CancelFunc
source *CronSource
queue *fakeQueue
)

BeforeEach(func() {
ctx, cancel = context.WithCancel(baseCtx)
source = newSource(scheduleSpec)
queue = newFakeQueue()
})

AfterEach(func() {
cancel()
queue.ShutDown()
})

It("should not enqueue objects because ttl is not expired", func() {
spawnFakeObjects(5, 10, fakeObjectPhaseCompleted, fakeClient, fakeClock)
spawnFakeObjects(10, 10, fakeObjectPhasePending, fakeClient, fakeClock)
spawnFakeObjects(15, 10, fakeObjectPhaseRunning, fakeClient, fakeClock)

Expect(source.Start(ctx, queue)).To(Succeed())
time.Sleep(1 * time.Second)

// Go to 2025-01-02 01:00.
// CronSource should be started but not enqueued any objects because ttl is not expired.
fakeClock.Step(13 * time.Hour)

Consistently(func() int {
return len(queue.Requests())
}).WithTimeout(10 * time.Second).Should(Equal(0))
})

It("should enqueue 10 objects because ttl is not expired but objects completed and them more that maxCount", func() {
spawnFakeObjects(10, 11, fakeObjectPhaseCompleted, fakeClient, fakeClock)
spawnFakeObjects(10, 11, fakeObjectPhasePending, fakeClient, fakeClock)
spawnFakeObjects(10, 11, fakeObjectPhaseRunning, fakeClient, fakeClock)

Expect(source.Start(ctx, queue)).To(Succeed())
time.Sleep(1 * time.Second)

// Go to 2025-01-02 01:00.
// CronSource should be started and enqueued 10 objects.
fakeClock.Step(13 * time.Hour)

Eventually(func() int {
return len(queue.Requests())
}).WithTimeout(10 * time.Second).Should(Equal(10))
})

It("should enqueue 100 objects in completed state", func() {
spawnFakeObjects(10, 10, fakeObjectPhaseCompleted, fakeClient, fakeClock)
spawnFakeObjects(10, 10, fakeObjectPhasePending, fakeClient, fakeClock)
spawnFakeObjects(10, 10, fakeObjectPhaseRunning, fakeClient, fakeClock)

Expect(source.Start(ctx, queue)).To(Succeed())
time.Sleep(1 * time.Second)

// Go to 2025-01-03 01:00.
// CronSource should be started and enqueued 100 objects.
fakeClock.Step(37 * time.Hour)

Eventually(func() int {
return len(queue.Requests())
}).WithTimeout(10 * time.Second).Should(Equal(100))
})
})
})
Loading
Loading