Skip to content

Commit d3e8030

Browse files
committed
run-resourcewatch: Re-add a retry loop for git commands
1 parent 18296e3 commit d3e8030

File tree

3 files changed

+53
-46
lines changed

3 files changed

+53
-46
lines changed

pkg/resourcewatch/operator/starter.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ import (
1919
"k8s.io/klog/v2"
2020
)
2121

22-
type observationSource func(ctx context.Context, log logr.Logger, resourceC chan<- *observe.ResourceObservation) chan struct{}
23-
type observationSink func(<-chan *observe.ResourceObservation) chan struct{}
22+
type observationSource func(context.Context, logr.Logger, chan<- *observe.ResourceObservation) chan struct{}
23+
type observationSink func(context.Context, <-chan *observe.ResourceObservation) chan struct{}
2424

2525
// this doesn't appear to handle restarts cleanly. To do so it would need to compare the resource version that it is applying
2626
// to the resource version present and it would need to handle unobserved deletions properly. both are possible, neither is easy.
@@ -88,7 +88,7 @@ func RunResourceWatch(toJsonPath, fromJsonPath string) error {
8888
log := klog.FromContext(ctx)
8989

9090
sourceFinished := source(ctx, log, resourceC)
91-
sinkFinished := sink(resourceC)
91+
sinkFinished := sink(ctx, resourceC)
9292

9393
// Wait for the source and sink to finish.
9494
select {
@@ -263,16 +263,16 @@ func gitSink() (observationSink, error) {
263263
return nil, err
264264
}
265265

266-
return func(resourceC <-chan *observe.ResourceObservation) chan struct{} {
266+
return func(ctx context.Context, resourceC <-chan *observe.ResourceObservation) chan struct{} {
267267
finished := make(chan struct{})
268268
go func() {
269269
defer close(finished)
270270
for observation := range resourceC {
271-
gitWrite(gitStorage, observation)
271+
gitWrite(ctx, gitStorage, observation)
272272
}
273273

274274
// We disable GC while we're writing, so run it after we're done.
275-
if err := gitStorage.GC(); err != nil {
275+
if err := gitStorage.GC(ctx); err != nil {
276276
klog.Errorf("Failed to run git gc with error %v", err)
277277
}
278278
}()
@@ -288,26 +288,26 @@ func gitInitStorage() (*storage.GitStorage, error) {
288288
return storage.NewGitStorage(repositoryPath)
289289
}
290290

291-
func gitWrite(gitStorage *storage.GitStorage, observation *observe.ResourceObservation) {
291+
func gitWrite(ctx context.Context, gitStorage *storage.GitStorage, observation *observe.ResourceObservation) {
292292
gvr := schema.GroupVersionResource{
293293
Group: observation.Group,
294294
Version: observation.Version,
295295
Resource: observation.Resource,
296296
}
297297
switch observation.ObservationType {
298298
case observe.ObservationTypeAdd:
299-
gitStorage.OnAdd(observation.ObservationTime, gvr, observation.Object)
299+
gitStorage.OnAdd(ctx, observation.ObservationTime, gvr, observation.Object)
300300
case observe.ObservationTypeUpdate:
301-
gitStorage.OnUpdate(observation.ObservationTime, gvr, observation.OldObject, observation.Object)
301+
gitStorage.OnUpdate(ctx, observation.ObservationTime, gvr, observation.OldObject, observation.Object)
302302
case observe.ObservationTypeDelete:
303-
gitStorage.OnDelete(observation.ObservationTime, gvr, observation.Object)
303+
gitStorage.OnDelete(ctx, observation.ObservationTime, gvr, observation.Object)
304304
}
305305
}
306306

307307
func jsonSink(file io.WriteCloser) (observationSink, error) {
308308
encoder := json.NewEncoder(file)
309309

310-
return func(resourceC <-chan *observe.ResourceObservation) chan struct{} {
310+
return func(ctx context.Context, resourceC <-chan *observe.ResourceObservation) chan struct{} {
311311
finished := make(chan struct{})
312312
go func() {
313313
defer func() {

pkg/resourcewatch/operator/starter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,6 @@ func BenchmarkGitSink(b *testing.B) {
4444
source(b.Context(), logr.Discard(), resourceC)
4545

4646
for b.Loop() {
47-
gitWrite(gitStorage, <-resourceC)
47+
gitWrite(b.Context(), gitStorage, <-resourceC)
4848
}
4949
}

pkg/resourcewatch/storage/git_store.go

Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package storage
22

33
import (
4+
"context"
45
"fmt"
56
"os"
67
"os/exec"
@@ -17,6 +18,7 @@ import (
1718
"k8s.io/apimachinery/pkg/runtime"
1819
"k8s.io/apimachinery/pkg/runtime/schema"
1920
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
21+
"k8s.io/apimachinery/pkg/util/wait"
2022
"k8s.io/client-go/tools/cache"
2123
"k8s.io/klog/v2"
2224
"sigs.k8s.io/yaml"
@@ -56,12 +58,12 @@ func NewGitStorage(path string) (*GitStorage, error) {
5658
return storage, nil
5759
}
5860

59-
func (s *GitStorage) GC() error {
60-
return s.execGit("gc")
61+
func (s *GitStorage) GC(ctx context.Context) error {
62+
return s.execGit(ctx, "gc")
6163
}
6264

6365
// handle handles different operations on git
64-
func (s *GitStorage) handle(timestamp time.Time, gvr schema.GroupVersionResource, oldObj, obj *unstructured.Unstructured, delete bool) {
66+
func (s *GitStorage) handle(ctx context.Context, timestamp time.Time, gvr schema.GroupVersionResource, oldObj, obj *unstructured.Unstructured, delete bool) {
6567
filePath, content, err := decodeUnstructuredObject(gvr, obj)
6668
if err != nil {
6769
klog.Warningf("Decoding %q failed: %v", filePath, err)
@@ -89,14 +91,14 @@ func (s *GitStorage) handle(timestamp time.Time, gvr schema.GroupVersionResource
8991
// Add it first before removing it.
9092
if os.IsNotExist(err) {
9193
klog.Info("Observed delete of file we haven't previously observed. Adding it first.")
92-
s.handle(timestamp, gvr, nil, obj, false)
93-
s.handle(timestamp, gvr, nil, obj, true)
94+
s.handle(ctx, timestamp, gvr, nil, obj, false)
95+
s.handle(ctx, timestamp, gvr, nil, obj, true)
9496
return
9597
} else {
9698
klog.Errorf("Error removing %q: %v", filePath, err)
9799
}
98100
}
99-
if err := s.commitRemove(timestamp, filePath, "unknown", ocCommand); err != nil {
101+
if err := s.commitRemove(ctx, timestamp, filePath, "unknown", ocCommand); err != nil {
100102
klog.Error(err)
101103
}
102104

@@ -119,33 +121,33 @@ func (s *GitStorage) handle(timestamp time.Time, gvr schema.GroupVersionResource
119121
switch operation {
120122
case gitOpAdded:
121123
klog.Infof("Calling commitAdd for %s", filePath)
122-
if err := s.commitAdd(timestamp, filePath, modifyingUser, ocCommand); err != nil {
124+
if err := s.commitAdd(ctx, timestamp, filePath, modifyingUser, ocCommand); err != nil {
123125
klog.Error(err)
124126
}
125127
case gitOpModified:
126128
klog.Infof("Calling commitModify for %s", filePath)
127-
if err := s.commitModify(timestamp, filePath, modifyingUser, ocCommand); err != nil {
129+
if err := s.commitModify(ctx, timestamp, filePath, modifyingUser, ocCommand); err != nil {
128130
klog.Error(err)
129131
}
130132
default:
131133
klog.Errorf("unhandled case for %s: %d", filePath, operation)
132134
}
133135
}
134136

135-
func (s *GitStorage) OnAdd(timestamp time.Time, gvr schema.GroupVersionResource, obj interface{}) {
137+
func (s *GitStorage) OnAdd(ctx context.Context, timestamp time.Time, gvr schema.GroupVersionResource, obj interface{}) {
136138
objUnstructured := obj.(*unstructured.Unstructured)
137139

138-
s.handle(timestamp, gvr, nil, objUnstructured, false)
140+
s.handle(ctx, timestamp, gvr, nil, objUnstructured, false)
139141
}
140142

141-
func (s *GitStorage) OnUpdate(timestamp time.Time, gvr schema.GroupVersionResource, oldObj, obj interface{}) {
143+
func (s *GitStorage) OnUpdate(ctx context.Context, timestamp time.Time, gvr schema.GroupVersionResource, oldObj, obj interface{}) {
142144
objUnstructured := obj.(*unstructured.Unstructured)
143145
oldObjUnstructured := oldObj.(*unstructured.Unstructured)
144146

145-
s.handle(timestamp, gvr, oldObjUnstructured, objUnstructured, false)
147+
s.handle(ctx, timestamp, gvr, oldObjUnstructured, objUnstructured, false)
146148
}
147149

148-
func (s *GitStorage) OnDelete(timestamp time.Time, gvr schema.GroupVersionResource, obj interface{}) {
150+
func (s *GitStorage) OnDelete(ctx context.Context, timestamp time.Time, gvr schema.GroupVersionResource, obj interface{}) {
149151
objUnstructured, ok := obj.(*unstructured.Unstructured)
150152
if !ok {
151153
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
@@ -160,7 +162,7 @@ func (s *GitStorage) OnDelete(timestamp time.Time, gvr schema.GroupVersionResour
160162
}
161163
}
162164

163-
s.handle(timestamp, gvr, nil, objUnstructured, true)
165+
s.handle(ctx, timestamp, gvr, nil, objUnstructured, true)
164166
}
165167

166168
// guessAtModifyingUsers tries to figure out who modified the resource
@@ -223,62 +225,67 @@ func resourceFilename(gvr schema.GroupVersionResource, namespace, name string) s
223225
return filepath.Join("namespaces", namespace, groupStr, gvr.Resource, name+".yaml")
224226
}
225227

226-
func (s *GitStorage) execGit(args ...string) error {
228+
func (s *GitStorage) execGit(ctx context.Context, args ...string) error {
227229
// Disable automatic garbage collection to avoid racing with other processes.
228230
args = append([]string{"-c", "gc.auto=0"}, args...)
229231

230-
osCommand := exec.Command("git", args...)
231-
osCommand.Dir = s.path
232-
output, err := osCommand.CombinedOutput()
233-
if err != nil {
234-
klog.Errorf("Ran git %v\n%v\n\n", args, string(output))
235-
return err
236-
}
237-
return nil
232+
// NOTE(mdbooth): I thought I saw a phantom race since adding the gc.auto=0
233+
// flag, but I haven't been able to reproduce it since. This retry loop is
234+
// hopefully unnecessary.
235+
return wait.PollUntilContextTimeout(ctx, 1*time.Second, 15*time.Second, true, func(ctx context.Context) (bool, error) {
236+
osCommand := exec.Command("git", args...)
237+
osCommand.Dir = s.path
238+
output, err := osCommand.CombinedOutput()
239+
if err != nil {
240+
klog.Errorf("Ran git %v\n%v\n\n", args, string(output))
241+
return false, err
242+
}
243+
return true, nil
244+
})
238245
}
239246

240-
func (s *GitStorage) commit(timestamp time.Time, path, author, commitMessage string) error {
247+
func (s *GitStorage) commit(ctx context.Context, timestamp time.Time, path, author, commitMessage string) error {
241248
authorString := fmt.Sprintf("%s <[email protected]>", author)
242249
dateString := timestamp.Format(time.RFC3339)
243250

244-
return s.execGit("commit", "--author", authorString, "--date", dateString, "-m", commitMessage)
251+
return s.execGit(ctx, "commit", "--author", authorString, "--date", dateString, "-m", commitMessage)
245252
}
246253

247-
func (s *GitStorage) commitAdd(timestamp time.Time, path, author, ocCommand string) error {
248-
if err := s.execGit("add", path); err != nil {
254+
func (s *GitStorage) commitAdd(ctx context.Context, timestamp time.Time, path, author, ocCommand string) error {
255+
if err := s.execGit(ctx, "add", path); err != nil {
249256
return err
250257
}
251258

252259
commitMessage := fmt.Sprintf("added %s", ocCommand)
253-
if err := s.commit(timestamp, path, author, commitMessage); err != nil {
260+
if err := s.commit(ctx, timestamp, path, author, commitMessage); err != nil {
254261
return err
255262
}
256263

257264
klog.Infof("Add: %v -- %v added %v", path, author, ocCommand)
258265
return nil
259266
}
260267

261-
func (s *GitStorage) commitModify(timestamp time.Time, path, author, ocCommand string) error {
262-
if err := s.execGit("add", path); err != nil {
268+
func (s *GitStorage) commitModify(ctx context.Context, timestamp time.Time, path, author, ocCommand string) error {
269+
if err := s.execGit(ctx, "add", path); err != nil {
263270
return err
264271
}
265272

266273
commitMessage := fmt.Sprintf("modifed %s", ocCommand)
267-
if err := s.commit(timestamp, path, author, commitMessage); err != nil {
274+
if err := s.commit(ctx, timestamp, path, author, commitMessage); err != nil {
268275
return err
269276
}
270277

271278
klog.Infof("Modified: %v -- %v updated %v", path, author, ocCommand)
272279
return nil
273280
}
274281

275-
func (s *GitStorage) commitRemove(timestamp time.Time, path, author, ocCommand string) error {
276-
if err := s.execGit("rm", path); err != nil {
282+
func (s *GitStorage) commitRemove(ctx context.Context, timestamp time.Time, path, author, ocCommand string) error {
283+
if err := s.execGit(ctx, "rm", path); err != nil {
277284
return err
278285
}
279286

280287
commitMessage := fmt.Sprintf("removed %s", ocCommand)
281-
if err := s.commit(timestamp, path, author, commitMessage); err != nil {
288+
if err := s.commit(ctx, timestamp, path, author, commitMessage); err != nil {
282289
return err
283290
}
284291

0 commit comments

Comments
 (0)