Skip to content

Commit 25f0996

Browse files
committed
run-resourcewatch: Re-add a retry loop for git commands
run-resourcewatch should no longer race with itself, but this adds robustness in case another process (like an impatient manual tester) is running git commands in the same repo.
1 parent da11db0 commit 25f0996

File tree

6 files changed

+58
-46
lines changed

6 files changed

+58
-46
lines changed

pkg/resourcewatch/git/git_store.go

Lines changed: 43 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package git
22

33
import (
4+
"context"
45
"fmt"
56
"os"
67
"os/exec"
@@ -17,6 +18,7 @@ import (
1718
"k8s.io/apimachinery/pkg/runtime"
1819
"k8s.io/apimachinery/pkg/runtime/schema"
1920
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
21+
"k8s.io/apimachinery/pkg/util/wait"
2022
"k8s.io/client-go/tools/cache"
2123
"k8s.io/klog/v2"
2224
"sigs.k8s.io/yaml"
@@ -56,12 +58,12 @@ func NewGitStorage(path string) (*GitStorage, error) {
5658
return storage, nil
5759
}
5860

59-
func (s *GitStorage) GC() error {
60-
return s.execGit("gc")
61+
func (s *GitStorage) GC(ctx context.Context) error {
62+
return s.execGit(ctx, "gc")
6163
}
6264

6365
// handle handles different operations on git
64-
func (s *GitStorage) handle(timestamp time.Time, gvr schema.GroupVersionResource, oldObj, obj *unstructured.Unstructured, delete bool) {
66+
func (s *GitStorage) handle(ctx context.Context, timestamp time.Time, gvr schema.GroupVersionResource, oldObj, obj *unstructured.Unstructured, delete bool) {
6567
filePath, content, err := decodeUnstructuredObject(gvr, obj)
6668
if err != nil {
6769
klog.Warningf("Decoding %q failed: %v", filePath, err)
@@ -89,14 +91,14 @@ func (s *GitStorage) handle(timestamp time.Time, gvr schema.GroupVersionResource
8991
// Add it first before removing it.
9092
if os.IsNotExist(err) {
9193
klog.Info("Observed delete of file we haven't previously observed. Adding it first.")
92-
s.handle(timestamp, gvr, nil, obj, false)
93-
s.handle(timestamp, gvr, nil, obj, true)
94+
s.handle(ctx, timestamp, gvr, nil, obj, false)
95+
s.handle(ctx, timestamp, gvr, nil, obj, true)
9496
return
9597
} else {
9698
klog.Errorf("Error removing %q: %v", filePath, err)
9799
}
98100
}
99-
if err := s.commitRemove(timestamp, filePath, "unknown", ocCommand); err != nil {
101+
if err := s.commitRemove(ctx, timestamp, filePath, "unknown", ocCommand); err != nil {
100102
klog.Error(err)
101103
}
102104

@@ -119,33 +121,33 @@ func (s *GitStorage) handle(timestamp time.Time, gvr schema.GroupVersionResource
119121
switch operation {
120122
case gitOpAdded:
121123
klog.Infof("Calling commitAdd for %s", filePath)
122-
if err := s.commitAdd(timestamp, filePath, modifyingUser, ocCommand); err != nil {
124+
if err := s.commitAdd(ctx, timestamp, filePath, modifyingUser, ocCommand); err != nil {
123125
klog.Error(err)
124126
}
125127
case gitOpModified:
126128
klog.Infof("Calling commitModify for %s", filePath)
127-
if err := s.commitModify(timestamp, filePath, modifyingUser, ocCommand); err != nil {
129+
if err := s.commitModify(ctx, timestamp, filePath, modifyingUser, ocCommand); err != nil {
128130
klog.Error(err)
129131
}
130132
default:
131133
klog.Errorf("unhandled case for %s: %d", filePath, operation)
132134
}
133135
}
134136

135-
func (s *GitStorage) OnAdd(timestamp time.Time, gvr schema.GroupVersionResource, obj interface{}) {
137+
func (s *GitStorage) OnAdd(ctx context.Context, timestamp time.Time, gvr schema.GroupVersionResource, obj interface{}) {
136138
objUnstructured := obj.(*unstructured.Unstructured)
137139

138-
s.handle(timestamp, gvr, nil, objUnstructured, false)
140+
s.handle(ctx, timestamp, gvr, nil, objUnstructured, false)
139141
}
140142

141-
func (s *GitStorage) OnUpdate(timestamp time.Time, gvr schema.GroupVersionResource, oldObj, obj interface{}) {
143+
func (s *GitStorage) OnUpdate(ctx context.Context, timestamp time.Time, gvr schema.GroupVersionResource, oldObj, obj interface{}) {
142144
objUnstructured := obj.(*unstructured.Unstructured)
143145
oldObjUnstructured := oldObj.(*unstructured.Unstructured)
144146

145-
s.handle(timestamp, gvr, oldObjUnstructured, objUnstructured, false)
147+
s.handle(ctx, timestamp, gvr, oldObjUnstructured, objUnstructured, false)
146148
}
147149

148-
func (s *GitStorage) OnDelete(timestamp time.Time, gvr schema.GroupVersionResource, obj interface{}) {
150+
func (s *GitStorage) OnDelete(ctx context.Context, timestamp time.Time, gvr schema.GroupVersionResource, obj interface{}) {
149151
objUnstructured, ok := obj.(*unstructured.Unstructured)
150152
if !ok {
151153
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
@@ -160,7 +162,7 @@ func (s *GitStorage) OnDelete(timestamp time.Time, gvr schema.GroupVersionResour
160162
}
161163
}
162164

163-
s.handle(timestamp, gvr, nil, objUnstructured, true)
165+
s.handle(ctx, timestamp, gvr, nil, objUnstructured, true)
164166
}
165167

166168
// guessAtModifyingUsers tries to figure out who modified the resource
@@ -223,62 +225,69 @@ func resourceFilename(gvr schema.GroupVersionResource, namespace, name string) s
223225
return filepath.Join("namespaces", namespace, groupStr, gvr.Resource, name+".yaml")
224226
}
225227

226-
func (s *GitStorage) execGit(args ...string) error {
228+
func (s *GitStorage) execGit(ctx context.Context, args ...string) error {
227229
// Disable automatic garbage collection to avoid racing with other processes.
228230
args = append([]string{"-c", "gc.auto=0"}, args...)
229231

230-
osCommand := exec.Command("git", args...)
231-
osCommand.Dir = s.path
232-
output, err := osCommand.CombinedOutput()
233-
if err != nil {
234-
klog.Errorf("Ran git %v\n%v\n\n", args, string(output))
235-
return err
236-
}
237-
return nil
232+
// The git store should no longer race with itself since we
233+
// disabled auto gc so this should never happen in a CI run. However,
234+
// manually executed git commands may still cause errors, so we have a retry
235+
// loop for robustness.
236+
return wait.PollUntilContextCancel(ctx, 1*time.Second, true, func(ctx context.Context) (bool, error) {
237+
osCommand := exec.Command("git", args...)
238+
osCommand.Dir = s.path
239+
output, err := osCommand.CombinedOutput()
240+
if err != nil {
241+
klog.Errorf("Ran git %v\n%v\n\n", args, string(output))
242+
// Don't return the error or we'll stop polling.
243+
return false, nil
244+
}
245+
return true, nil
246+
})
238247
}
239248

240-
func (s *GitStorage) commit(timestamp time.Time, path, author, commitMessage string) error {
249+
func (s *GitStorage) commit(ctx context.Context, timestamp time.Time, path, author, commitMessage string) error {
241250
authorString := fmt.Sprintf("%s <[email protected]>", author)
242251
dateString := timestamp.Format(time.RFC3339)
243252

244-
return s.execGit("commit", "--author", authorString, "--date", dateString, "-m", commitMessage)
253+
return s.execGit(ctx, "commit", "--author", authorString, "--date", dateString, "-m", commitMessage)
245254
}
246255

247-
func (s *GitStorage) commitAdd(timestamp time.Time, path, author, ocCommand string) error {
248-
if err := s.execGit("add", path); err != nil {
256+
func (s *GitStorage) commitAdd(ctx context.Context, timestamp time.Time, path, author, ocCommand string) error {
257+
if err := s.execGit(ctx, "add", path); err != nil {
249258
return err
250259
}
251260

252261
commitMessage := fmt.Sprintf("added %s", ocCommand)
253-
if err := s.commit(timestamp, path, author, commitMessage); err != nil {
262+
if err := s.commit(ctx, timestamp, path, author, commitMessage); err != nil {
254263
return err
255264
}
256265

257266
klog.Infof("Add: %v -- %v added %v", path, author, ocCommand)
258267
return nil
259268
}
260269

261-
func (s *GitStorage) commitModify(timestamp time.Time, path, author, ocCommand string) error {
262-
if err := s.execGit("add", path); err != nil {
270+
func (s *GitStorage) commitModify(ctx context.Context, timestamp time.Time, path, author, ocCommand string) error {
271+
if err := s.execGit(ctx, "add", path); err != nil {
263272
return err
264273
}
265274

266275
commitMessage := fmt.Sprintf("modifed %s", ocCommand)
267-
if err := s.commit(timestamp, path, author, commitMessage); err != nil {
276+
if err := s.commit(ctx, timestamp, path, author, commitMessage); err != nil {
268277
return err
269278
}
270279

271280
klog.Infof("Modified: %v -- %v updated %v", path, author, ocCommand)
272281
return nil
273282
}
274283

275-
func (s *GitStorage) commitRemove(timestamp time.Time, path, author, ocCommand string) error {
276-
if err := s.execGit("rm", path); err != nil {
284+
func (s *GitStorage) commitRemove(ctx context.Context, timestamp time.Time, path, author, ocCommand string) error {
285+
if err := s.execGit(ctx, "rm", path); err != nil {
277286
return err
278287
}
279288

280289
commitMessage := fmt.Sprintf("removed %s", ocCommand)
281-
if err := s.commit(timestamp, path, author, commitMessage); err != nil {
290+
if err := s.commit(ctx, timestamp, path, author, commitMessage); err != nil {
282291
return err
283292
}
284293

pkg/resourcewatch/git/git_store_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func BenchmarkGitSink(b *testing.B) {
3333

3434
b.ResetTimer()
3535
for i := range b.N {
36-
gitWrite(gitStorage, resources[i])
36+
gitWrite(context.TODO(), gitStorage, resources[i])
3737
}
3838
}
3939

pkg/resourcewatch/git/sink.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package git
22

33
import (
4+
"context"
45
"os"
56

67
"github.com/go-logr/logr"
@@ -16,16 +17,16 @@ func Sink(log logr.Logger) (observe.ObservationSink, error) {
1617
return nil, err
1718
}
1819

19-
return func(log logr.Logger, resourceC <-chan *observe.ResourceObservation) chan struct{} {
20+
return func(ctx context.Context, log logr.Logger, resourceC <-chan *observe.ResourceObservation) chan struct{} {
2021
finished := make(chan struct{})
2122
go func() {
2223
defer close(finished)
2324
for observation := range resourceC {
24-
gitWrite(gitStorage, observation)
25+
gitWrite(ctx, gitStorage, observation)
2526
}
2627

2728
// We disable GC while we're writing, so run it after we're done.
28-
if err := gitStorage.GC(); err != nil {
29+
if err := gitStorage.GC(ctx); err != nil {
2930
log.Error(err, "Failed to run git gc")
3031
}
3132
}()
@@ -41,18 +42,18 @@ func gitInitStorage() (*GitStorage, error) {
4142
return NewGitStorage(repositoryPath)
4243
}
4344

44-
func gitWrite(gitStorage *GitStorage, observation *observe.ResourceObservation) {
45+
func gitWrite(ctx context.Context, gitStorage *GitStorage, observation *observe.ResourceObservation) {
4546
gvr := schema.GroupVersionResource{
4647
Group: observation.Group,
4748
Version: observation.Version,
4849
Resource: observation.Resource,
4950
}
5051
switch observation.ObservationType {
5152
case observe.ObservationTypeAdd:
52-
gitStorage.OnAdd(observation.ObservationTime, gvr, observation.Object)
53+
gitStorage.OnAdd(ctx, observation.ObservationTime, gvr, observation.Object)
5354
case observe.ObservationTypeUpdate:
54-
gitStorage.OnUpdate(observation.ObservationTime, gvr, observation.OldObject, observation.Object)
55+
gitStorage.OnUpdate(ctx, observation.ObservationTime, gvr, observation.OldObject, observation.Object)
5556
case observe.ObservationTypeDelete:
56-
gitStorage.OnDelete(observation.ObservationTime, gvr, observation.Object)
57+
gitStorage.OnDelete(ctx, observation.ObservationTime, gvr, observation.Object)
5758
}
5859
}

pkg/resourcewatch/json/json.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func Source(file io.ReadCloser) (observe.ObservationSource, error) {
4242
func Sink(file io.WriteCloser) (observe.ObservationSink, error) {
4343
encoder := json.NewEncoder(file)
4444

45-
return func(log logr.Logger, resourceC <-chan *observe.ResourceObservation) chan struct{} {
45+
return func(_ context.Context, log logr.Logger, resourceC <-chan *observe.ResourceObservation) chan struct{} {
4646
finished := make(chan struct{})
4747
go func() {
4848
defer func() {

pkg/resourcewatch/observe/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,4 @@ type ResourceObservation struct {
3232
}
3333

3434
type ObservationSource func(ctx context.Context, log logr.Logger, resourceC chan<- *ResourceObservation) chan struct{}
35-
type ObservationSink func(log logr.Logger, resourceC <-chan *ResourceObservation) chan struct{}
35+
type ObservationSink func(ctx context.Context, log logr.Logger, resourceC <-chan *ResourceObservation) chan struct{}

pkg/resourcewatch/operator/starter.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,18 @@ func RunResourceWatch(toJsonPath, fromJsonPath string) error {
7777

7878
// Observers emit observations to this channel. We use this channel as a buffer between the observers and the git writer.
7979
// Memory consumption will grow if we can't write quickly enough.
80-
resourceC := make(chan *observe.ResourceObservation, 1000000)
80+
// For reference, the captured test data from a 4.20 installation has ~17000 observations.
81+
resourceC := make(chan *observe.ResourceObservation, 10000)
8182

8283
sourceFinished := source(ctx, log, resourceC)
83-
sinkFinished := sink(log, resourceC)
84+
sinkFinished := sink(ctx, log, resourceC)
8485

8586
// Wait for the source and sink to finish.
8687
select {
8788
case <-sourceFinished:
8889
// The source finished. This will also happen if the context is cancelled.
8990
close(resourceC)
91+
log.Info("Source finished")
9092

9193
// Wait for the sink to finish writing queued observations.
9294
<-sinkFinished

0 commit comments

Comments
 (0)