Skip to content

Commit abb8cb3

Browse files
committed
run-resourcewatch: Remove goroutines from gitstorage
1 parent c751d92 commit abb8cb3

File tree

2 files changed

+3
-103
lines changed

2 files changed

+3
-103
lines changed

pkg/resourcewatch/operator/starter_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"compress/gzip"
55
"os"
66
"testing"
7-
"time"
87

98
"github.com/go-logr/logr"
109
"github.com/openshift/origin/pkg/resourcewatch/observe"
@@ -47,8 +46,4 @@ func BenchmarkGitSink(b *testing.B) {
4746
for b.Loop() {
4847
gitWrite(gitStorage, <-resourceC)
4948
}
50-
51-
// gitStorage creates unmanaged go threads which can prevent us from cleaning up.
52-
// Until we address that, just give them a chance to finish
53-
time.Sleep(5 * time.Second)
5449
}

pkg/resourcewatch/storage/git_store.go

Lines changed: 3 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"os/exec"
77
"path/filepath"
88
"strings"
9-
"sync"
109
"time"
1110

1211
"k8s.io/apimachinery/pkg/util/wait"
@@ -24,46 +23,9 @@ import (
2423
"sigs.k8s.io/yaml"
2524
)
2625

27-
type workingSet struct {
28-
currentlyWorking sets.String
29-
lock sync.RWMutex
30-
}
31-
32-
func (s *workingSet) isWorkingOn(key string) bool {
33-
s.lock.RLock()
34-
defer s.lock.RUnlock()
35-
return s.currentlyWorking.Has(key)
36-
}
37-
38-
func (s *workingSet) reserve(key string) {
39-
s.lock.Lock()
40-
defer s.lock.Unlock()
41-
s.currentlyWorking.Insert(key)
42-
}
43-
44-
func (s *workingSet) release(key string) {
45-
s.lock.Lock()
46-
defer s.lock.Unlock()
47-
s.currentlyWorking.Delete(key)
48-
}
49-
50-
func (s *workingSet) waitUntilAvailable(key string) error {
51-
return wait.PollImmediate(1*time.Second, 30*time.Second, func() (bool, error) {
52-
if s.isWorkingOn(key) {
53-
return false, nil
54-
}
55-
return true, nil
56-
})
57-
}
58-
5926
type GitStorage struct {
6027
repo *git.Repository
6128
path string
62-
63-
currentlyRecording workingSet
64-
65-
// Writing to Git repository must be synced otherwise Git will freak out
66-
sync.Mutex
6729
}
6830

6931
type gitOperation int
@@ -91,33 +53,12 @@ func NewGitStorage(path string) (*GitStorage, error) {
9153
return nil, err
9254
}
9355
storage := &GitStorage{path: path, repo: repo}
94-
storage.currentlyRecording.currentlyWorking = sets.String{}
9556

9657
return storage, nil
9758
}
9859

9960
// handle handles different operations on git
10061
func (s *GitStorage) handle(gvr schema.GroupVersionResource, oldObj, obj *unstructured.Unstructured, delete bool) {
101-
// notifications for resources come in a single threaded stream per-resource.
102-
// this means there will never be contention on a single file.
103-
// we will lock just before the commit itself.
104-
105-
// Allowing files to be written while a commit is in progress leads to commit failures due to unstaged changes
106-
// moving the lock to the top of the handler to make the action atomic.
107-
// E0706 02:06:26.489945 2444 git_store.go:338] Ran git add "namespaces/openshift-cloud-credential-operator/core/services/cco-metrics.yaml" && git commit --author="modified-unknown <[email protected]>" -m "modifed services/cco-metrics -n openshift-cloud-credential-operator"
108-
// On branch master
109-
// Changes not staged for commit:
110-
// (use "git add <file>..." to update what will be committed)
111-
// (use "git restore <file>..." to discard changes in working directory)
112-
// modified: namespaces/openshift-apiserver/core/pods/apiserver-856c47d994-47cf7.yaml
113-
//
114-
// Untracked files:
115-
// (use "git add <file>..." to include in what will be committed)
116-
// namespaces/openshift-etcd/core/pods/etcd-ci-op-g2xjpfr4-ed5cd-gqjcq-master-0.yaml
117-
118-
s.Lock()
119-
defer s.Unlock()
120-
12162
filePath, content, err := decodeUnstructuredObject(gvr, obj)
12263
if err != nil {
12364
klog.Warningf("Decoding %q failed: %v", filePath, err)
@@ -198,38 +139,14 @@ func (s *GitStorage) handle(gvr schema.GroupVersionResource, oldObj, obj *unstru
198139
func (s *GitStorage) OnAdd(gvr schema.GroupVersionResource, obj interface{}) {
199140
objUnstructured := obj.(*unstructured.Unstructured)
200141

201-
// serialize updates to individual files
202-
key := fmt.Sprintf("%s/%s/%s/%s/%s", gvr.Group, gvr.Version, gvr.Resource, objUnstructured.GetNamespace(), objUnstructured.GetName())
203-
if err := s.currentlyRecording.waitUntilAvailable(key); err != nil {
204-
klog.Error(err)
205-
return
206-
}
207-
s.currentlyRecording.reserve(key)
208-
209-
// start new go func to allow parallel processing where possible and to avoid blocking all progress on retries.
210-
go func() {
211-
defer s.currentlyRecording.release(key)
212-
s.handle(gvr, nil, objUnstructured, false)
213-
}()
142+
s.handle(gvr, nil, objUnstructured, false)
214143
}
215144

216145
func (s *GitStorage) OnUpdate(gvr schema.GroupVersionResource, oldObj, obj interface{}) {
217146
objUnstructured := obj.(*unstructured.Unstructured)
218147
oldObjUnstructured := oldObj.(*unstructured.Unstructured)
219148

220-
// serialize updates to individual files
221-
key := fmt.Sprintf("%s/%s/%s/%s/%s", gvr.Group, gvr.Version, gvr.Resource, objUnstructured.GetNamespace(), objUnstructured.GetName())
222-
if err := s.currentlyRecording.waitUntilAvailable(key); err != nil {
223-
klog.Error(err)
224-
return
225-
}
226-
s.currentlyRecording.reserve(key)
227-
228-
// start new go func to allow parallel processing where possible and to avoid blocking all progress on retries.
229-
go func() {
230-
defer s.currentlyRecording.release(key)
231-
s.handle(gvr, oldObjUnstructured, objUnstructured, false)
232-
}()
149+
s.handle(gvr, oldObjUnstructured, objUnstructured, false)
233150
}
234151

235152
func (s *GitStorage) OnDelete(gvr schema.GroupVersionResource, obj interface{}) {
@@ -247,19 +164,7 @@ func (s *GitStorage) OnDelete(gvr schema.GroupVersionResource, obj interface{})
247164
}
248165
}
249166

250-
// serialize updates to individual files
251-
key := fmt.Sprintf("%s/%s/%s/%s/%s", gvr.Group, gvr.Version, gvr.Resource, objUnstructured.GetNamespace(), objUnstructured.GetName())
252-
if err := s.currentlyRecording.waitUntilAvailable(key); err != nil {
253-
klog.Error(err)
254-
return
255-
}
256-
s.currentlyRecording.reserve(key)
257-
258-
// start new go func to allow parallel processing where possible and to avoid blocking all progress on retries.
259-
go func() {
260-
defer s.currentlyRecording.release(key)
261-
s.handle(gvr, nil, objUnstructured, true)
262-
}()
167+
s.handle(gvr, nil, objUnstructured, true)
263168
}
264169

265170
// guessAtModifyingUsers tries to figure out who modified the resource

0 commit comments

Comments
 (0)