@@ -22,6 +22,7 @@ import (
2222 "time"
2323
2424 "github.com/robfig/cron/v3"
25+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2526 "k8s.io/apimachinery/pkg/types"
2627 "k8s.io/client-go/util/workqueue"
2728 "k8s.io/utils/clock"
@@ -33,33 +34,41 @@ import (
3334 "github.com/deckhouse/virtualization-controller/pkg/logger"
3435)
3536
37+ /**
38+ CronSource is an implementation of the controller-runtime Source interface.
39+ It periodically triggers and emit events for a list of objects.
40+
41+ The component is independent of kubernetes client: developer should implement
42+ ObjectLister interface that CronSource will use to determine what to enqueue on trigger.
43+
44+ NewSingleObjectLister can be used if the main objective is to get periodical event,
45+ but specific namespace and name are not important. Also, for this situation
46+ object name can be used to distinguish cron trigger from the kubernetes trigger.
47+ */
48+
3649var _ source.Source = & CronSource {}
3750
3851const sourceName = "CronSource"
3952
40- type SourceGCManager interface {
41- ListForDelete (ctx context.Context , now time.Time ) ([]client.Object , error )
42- }
43-
44- func NewCronSource (scheduleSpec string , mgr SourceGCManager , log * log.Logger ) (* CronSource , error ) {
53+ func NewCronSource (scheduleSpec string , objLister ObjectLister , log * log.Logger ) (* CronSource , error ) {
4554 schedule , err := cron .ParseStandard (scheduleSpec )
4655 if err != nil {
4756 return nil , fmt .Errorf ("parsing standard spec %q: %w" , scheduleSpec , err )
4857 }
4958
5059 return & CronSource {
51- schedule : schedule ,
52- mgr : mgr ,
53- log : log .With ("WatchSource " , sourceName ),
54- clock : & clock.RealClock {},
60+ schedule : schedule ,
61+ objLister : objLister ,
62+ log : log .With ("watchSource " , sourceName ),
63+ clock : & clock.RealClock {},
5564 }, nil
5665}
5766
5867type CronSource struct {
59- schedule cron.Schedule
60- mgr SourceGCManager
61- log * log.Logger
62- clock clock.Clock
68+ schedule cron.Schedule
69+ objLister ObjectLister
70+ log * log.Logger
71+ clock clock.Clock
6372}
6473
6574func (c * CronSource ) Start (ctx context.Context , queue workqueue.TypedRateLimitingInterface [reconcile.Request ]) error {
@@ -70,28 +79,29 @@ func (c *CronSource) Start(ctx context.Context, queue workqueue.TypedRateLimitin
7079 case <- ctx .Done ():
7180 return
7281 case <- c .clock .After (nextTime ):
73- c .addObjects (ctx , queue .Add )
82+ c .enqueueObjects (ctx , queue .Add )
7483 nextTime = nextScheduleTimeDuration (c .schedule , c .clock .Now ())
7584 }
7685 }
7786 }()
7887 return nil
7988}
8089
81- func (c * CronSource ) addObjects (ctx context.Context , addToQueue func (reconcile.Request )) {
82- objs , err := c .mgr .ListForDelete (ctx , c .clock .Now ())
90+ func (c * CronSource ) enqueueObjects (ctx context.Context , queueAddFunc func (reconcile.Request )) {
91+ now := c .clock .Now ()
92+ objs , err := c .objLister .List (ctx , now )
8393 if err != nil {
8494 c .log .Error ("Failed to get ObjectList for delete" , logger .SlogErr (err ))
8595 return
8696 }
8797
8898 if len (objs ) == 0 {
89- c .log .Debug ("No resources, skip" )
99+ c .log .Debug (fmt . Sprintf ( "No resources at %s , skip queueing" , now ) )
90100 return
91101 }
92102
93103 for _ , obj := range objs {
94- addToQueue (reconcile.Request {
104+ queueAddFunc (reconcile.Request {
95105 NamespacedName : types.NamespacedName {
96106 Namespace : obj .GetNamespace (),
97107 Name : obj .GetName (),
@@ -104,3 +114,33 @@ func (c *CronSource) addObjects(ctx context.Context, addToQueue func(reconcile.R
104114func nextScheduleTimeDuration (schedule cron.Schedule , now time.Time ) time.Duration {
105115 return schedule .Next (now ).Sub (now )
106116}
117+
118+ type ObjectLister interface {
119+ List (ctx context.Context , now time.Time ) ([]client.Object , error )
120+ }
121+
122+ type ObjectListerImpl struct {
123+ ListFunc func (ctx context.Context , now time.Time ) ([]client.Object , error )
124+ }
125+
126+ func (o * ObjectListerImpl ) List (ctx context.Context , now time.Time ) ([]client.Object , error ) {
127+ if o .ListFunc == nil {
128+ return nil , nil
129+ }
130+ return o .ListFunc (ctx , now )
131+ }
132+
133+ func NewObjectLister (listFunc func (ctx context.Context , now time.Time ) ([]client.Object , error )) * ObjectListerImpl {
134+ return & ObjectListerImpl {listFunc }
135+ }
136+
137+ func NewSingleObjectLister (namespace , name string ) * ObjectListerImpl {
138+ return & ObjectListerImpl {ListFunc : func (ctx context.Context , now time.Time ) ([]client.Object , error ) {
139+ return []client.Object {& metav1.PartialObjectMetadata {
140+ ObjectMeta : metav1.ObjectMeta {
141+ Namespace : namespace ,
142+ Name : name ,
143+ },
144+ }}, nil
145+ }}
146+ }
0 commit comments