@@ -17,157 +17,90 @@ limitations under the License.
17
17
package gc
18
18
19
19
import (
20
- "cmp"
21
20
"context"
22
21
"fmt"
23
- "slices"
24
22
"time"
25
23
26
24
"github.com/robfig/cron/v3"
27
- "k8s.io/apimachinery/pkg/api/meta"
28
- "k8s.io/apimachinery/pkg/runtime"
29
25
"k8s.io/apimachinery/pkg/types"
30
26
"k8s.io/client-go/util/workqueue"
27
+ "k8s.io/utils/clock"
31
28
"sigs.k8s.io/controller-runtime/pkg/client"
32
29
"sigs.k8s.io/controller-runtime/pkg/reconcile"
33
30
"sigs.k8s.io/controller-runtime/pkg/source"
34
31
35
32
"github.com/deckhouse/deckhouse/pkg/log"
36
- "github.com/deckhouse/virtualization-controller/pkg/common/object"
37
33
"github.com/deckhouse/virtualization-controller/pkg/logger"
38
34
)
39
35
40
- const sourceName = "CronSource"
41
-
42
- func NewCronSource (c client.Client ,
43
- standardSpec string ,
44
- objList client.ObjectList ,
45
- option CronSourceOption ,
46
- log * log.Logger ,
47
- ) * CronSource {
48
- return & CronSource {
49
- Client : c ,
50
- standardSpec : standardSpec ,
51
- objList : objList ,
52
- option : option ,
53
- log : log .With ("WatchSource" , sourceName ),
54
- }
55
- }
56
-
57
36
var _ source.Source = & CronSource {}
58
37
59
- type CronSource struct {
60
- client.Client
61
- standardSpec string
62
- objList client.ObjectList
63
- option CronSourceOption
64
- log * log.Logger
65
- }
38
+ const sourceName = "CronSource"
66
39
67
- type CronSourceOption struct {
68
- GetOlder func ( objList client. ObjectList ) client.ObjectList
40
+ type SourceGCManager interface {
41
+ ListForDelete ( ctx context. Context , now time. Time ) ([] client.Object , error )
69
42
}
70
43
71
- func NewDefaultCronSourceOption (objs client.ObjectList , ttl time.Duration , log * log.Logger ) CronSourceOption {
72
- return CronSourceOption {
73
- GetOlder : DefaultGetOlder (objs , ttl , 10 , log ),
44
+ func NewCronSource (scheduleSpec string , mgr SourceGCManager , log * log.Logger ) (* CronSource , error ) {
45
+ schedule , err := cron .ParseStandard (scheduleSpec )
46
+ if err != nil {
47
+ return nil , fmt .Errorf ("parsing standard spec %q: %w" , scheduleSpec , err )
74
48
}
75
- }
76
49
77
- func DefaultGetOlder (objs client.ObjectList , ttl time.Duration , maxCount int , log * log.Logger ) func (objList client.ObjectList ) client.ObjectList {
78
- return func (objList client.ObjectList ) client.ObjectList {
79
- var expiredItems []runtime.Object
80
- var notExpiredItems []runtime.Object
81
-
82
- if err := meta .EachListItem (objList , func (o runtime.Object ) error {
83
- obj , ok := o .(client.Object )
84
- if ! ok {
85
- return nil
86
- }
87
- if object .GetAge (obj ) > ttl {
88
- expiredItems = append (expiredItems , o )
89
- } else {
90
- notExpiredItems = append (notExpiredItems , o )
91
- }
92
-
93
- return nil
94
- }); err != nil {
95
- log .Error ("failed to populate list" , logger .SlogErr (err ))
96
- }
97
-
98
- if maxCount != 0 && len (notExpiredItems ) > maxCount {
99
- slices .SortFunc (notExpiredItems , func (a , b runtime.Object ) int {
100
- aObj , _ := a .(client.Object )
101
- bObj , _ := b .(client.Object )
102
-
103
- return cmp .Compare (object .GetAge (aObj ), object .GetAge (bObj ))
104
- })
105
- expiredItems = append (expiredItems , notExpiredItems [maxCount :]... )
106
- }
50
+ return & CronSource {
51
+ schedule : schedule ,
52
+ mgr : mgr ,
53
+ log : log .With ("WatchSource" , sourceName ),
54
+ clock : & clock.RealClock {},
55
+ }, nil
56
+ }
107
57
108
- if err := meta . SetList ( objs , expiredItems ); err != nil {
109
- log . Error ( "failed to set list" , logger . SlogErr ( err ))
110
- }
111
- return objs
112
- }
58
+ type CronSource struct {
59
+ schedule cron. Schedule
60
+ mgr SourceGCManager
61
+ log * log. Logger
62
+ clock clock. Clock
113
63
}
114
64
115
65
func (c * CronSource ) Start (ctx context.Context , queue workqueue.TypedRateLimitingInterface [reconcile.Request ]) error {
116
- schedule , err := cron .ParseStandard (c .standardSpec )
117
- if err != nil {
118
- return fmt .Errorf ("parsing standard spec %q: %w" , c .standardSpec , err )
119
- }
120
- work := func () {
121
- if err = meta .SetList (c .objList , nil ); err != nil {
122
- c .log .Error ("failed to reset resource list" , logger .SlogErr (err ))
123
- return
124
- }
125
- if err = c .List (ctx , c .objList ); err != nil {
126
- c .log .Error ("failed to listing resources" , logger .SlogErr (err ))
127
- return
128
- }
129
- if meta .LenList (c .objList ) == 0 {
130
- c .log .Debug ("no resources, skip" )
131
- return
132
- }
133
- if c .option .GetOlder != nil {
134
- c .objList = c .option .GetOlder (c .objList )
135
- }
136
- if err = meta .EachListItem (c .objList , func (object runtime.Object ) error {
137
- obj , ok := object .(client.Object )
138
- if ! ok {
139
- c .log .Error (fmt .Sprintf ("%s's type isn't metav1.Object" , object .GetObjectKind ().GroupVersionKind ().String ()))
140
- return nil
141
- }
142
-
143
- queue .Add (reconcile.Request {
144
- NamespacedName : types.NamespacedName {
145
- Namespace : obj .GetNamespace (),
146
- Name : obj .GetName (),
147
- },
148
- })
149
- c .log .Debug (fmt .Sprintf ("resource %s/%s enqueued" , obj .GetNamespace (), obj .GetName ()))
150
- return nil
151
- }); err != nil {
152
- c .log .Error ("failed to enqueueing resources" , logger .SlogErr (err ))
153
- return
154
- }
155
- }
156
- ta := nextScheduleTimeDuration (schedule , time .Now ())
66
+ nextTime := nextScheduleTimeDuration (c .schedule , c .clock .Now ())
157
67
go func () {
158
68
for {
159
69
select {
160
70
case <- ctx .Done ():
161
71
return
162
- case <- time . After (ta ):
163
- work ( )
164
- ta = nextScheduleTimeDuration (schedule , time .Now ())
72
+ case <- c . clock . After (nextTime ):
73
+ c . addObjects ( ctx , queue . Add )
74
+ nextTime = nextScheduleTimeDuration (c . schedule , c . clock .Now ())
165
75
}
166
76
}
167
77
}()
168
78
return nil
169
79
}
170
80
81
+ func (c * CronSource ) addObjects (ctx context.Context , addToQueue func (reconcile.Request )) {
82
+ objs , err := c .mgr .ListForDelete (ctx , c .clock .Now ())
83
+ if err != nil {
84
+ c .log .Error ("Failed to get ObjectList for delete" , logger .SlogErr (err ))
85
+ return
86
+ }
87
+
88
+ if len (objs ) == 0 {
89
+ c .log .Debug ("No resources, skip" )
90
+ return
91
+ }
92
+
93
+ for _ , obj := range objs {
94
+ addToQueue (reconcile.Request {
95
+ NamespacedName : types.NamespacedName {
96
+ Namespace : obj .GetNamespace (),
97
+ Name : obj .GetName (),
98
+ },
99
+ })
100
+ c .log .Debug (fmt .Sprintf ("Resource %s/%s enqueued" , obj .GetNamespace (), obj .GetName ()))
101
+ }
102
+ }
103
+
171
104
func nextScheduleTimeDuration (schedule cron.Schedule , now time.Time ) time.Duration {
172
105
return schedule .Next (now ).Sub (now )
173
106
}
0 commit comments