Skip to content

Commit c7cf754

Browse files
committed
replace mutex+map with sync.Map
Signed-off-by: Etai Lev Ran <[email protected]>
1 parent 7a63312 commit c7cf754

File tree

2 files changed

+44
-58
lines changed

2 files changed

+44
-58
lines changed

pkg/epp/datalayer/attributemap.go

Lines changed: 23 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -36,59 +36,54 @@ type AttributeMap interface {
3636

3737
// Attributes provides a goroutine safe implementation of AttributeMap.
3838
type Attributes struct {
39-
mu sync.RWMutex
40-
data map[string]Cloneable
39+
data sync.Map
4140
}
4241

4342
// NewAttributes return a new attribute map instance.
4443
func NewAttributes() *Attributes {
4544
return &Attributes{
46-
data: make(map[string]Cloneable),
45+
data: sync.Map{},
4746
}
4847
}
4948

5049
// Put adds (or updates) an attribute in the map.
5150
func (a *Attributes) Put(key string, value Cloneable) {
52-
a.mu.Lock()
53-
defer a.mu.Unlock()
54-
55-
a.data[key] = value // TODO: Clone into map?
51+
a.data.Store(key, value) // TODO: Clone into map?
5652
}
5753

5854
// Get returns an attribute from the map.
5955
func (a *Attributes) Get(key string) (Cloneable, bool) {
60-
a.mu.RLock()
61-
defer a.mu.RUnlock()
62-
63-
val, ok := a.data[key]
56+
val, ok := a.data.Load(key)
6457
if !ok {
6558
return nil, false
6659
}
67-
return val.Clone(), true
60+
if cloneable, ok := val.(Cloneable); ok {
61+
return cloneable.Clone(), true
62+
}
63+
return nil, false // shouldn't happen since Put accepts Cloneables only
6864
}
6965

7066
// Keys returns an array of all the names of attributes stored in the map.
7167
func (a *Attributes) Keys() []string {
72-
a.mu.RLock()
73-
defer a.mu.RUnlock()
74-
75-
keys := make([]string, 0, len(a.data))
76-
for k := range a.data {
77-
keys = append(keys, k)
78-
}
68+
keys := []string{}
69+
a.data.Range(func(key, _ any) bool {
70+
if k, ok := key.(string); ok {
71+
keys = append(keys, k)
72+
}
73+
return true // continue iteration
74+
})
7975
return keys
8076
}
8177

8278
// Clone the attributes object itself.
8379
func (a *Attributes) Clone() *Attributes {
84-
a.mu.RLock()
85-
defer a.mu.RUnlock()
86-
87-
m := make(map[string]Cloneable, len(a.data))
88-
for k, v := range a.data {
89-
m[k] = v.Clone()
90-
}
91-
return &Attributes{
92-
data: m,
80+
cloned := &Attributes{
81+
data: sync.Map{},
9382
}
83+
84+
a.data.Range(func(k, v interface{}) bool {
85+
cloned.data.Store(k, v)
86+
return true
87+
})
88+
return cloned
9489
}

pkg/epp/datalayer/datasource.go

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,6 @@ type DataSource interface {
4343
// the data source output type upon registration.
4444
AddExtractor(extractor Extractor) error
4545

46-
// TODO: the following is useful for a data source that operates on
47-
// endpoints and might not be relevant for "global/system" collectors which
48-
// might not need the concept of an endpoint. This can be split, if needed,
49-
// to a separate interface in the future.
50-
5146
// AddEndpoint adds an endpoint to collect from.
5247
AddEndpoint(ep Endpoint) error
5348

@@ -76,8 +71,7 @@ var (
7671
// DataSourceRegistry stores named data sources and makes them
7772
// accessible to GIE subsystems.
7873
type DataSourceRegistry struct {
79-
mu sync.RWMutex
80-
sources map[string]DataSource
74+
sources sync.Map
8175
}
8276

8377
// Register adds a source to the registry.
@@ -86,13 +80,10 @@ func (dsr *DataSourceRegistry) Register(src DataSource) error {
8680
return errors.New("unable to register a nil data source")
8781
}
8882

89-
dsr.mu.Lock()
90-
defer dsr.mu.Unlock()
91-
92-
if _, found := dsr.sources[src.Name()]; found {
83+
if _, found := dsr.sources.Load(src.Name()); found {
9384
return fmt.Errorf("unable to register duplicate data source: %s", src.Name())
9485
}
95-
dsr.sources[src.Name()] = src
86+
dsr.sources.Store(src.Name(), src)
9687
return nil
9788
}
9889

@@ -102,10 +93,10 @@ func (dsr *DataSourceRegistry) GetNamedSource(name string) (DataSource, bool) {
10293
return nil, false
10394
}
10495

105-
dsr.mu.RLock()
106-
defer dsr.mu.RUnlock()
107-
if ds, found := dsr.sources[name]; found {
108-
return ds, true
96+
if val, found := dsr.sources.Load(name); found {
97+
if ds, ok := val.(DataSource); ok {
98+
return ds, true
99+
} // ignore type assertion failures and fall through
109100
}
110101
return nil, false
111102
}
@@ -122,15 +113,15 @@ func (dsr *DataSourceRegistry) AddEndpoint(ep Endpoint) error {
122113
return nil
123114
}
124115

125-
dsr.mu.RLock()
126-
defer dsr.mu.RUnlock()
127116
errs := []error{}
128-
129-
for _, ds := range dsr.sources {
130-
if err := ds.AddEndpoint(ep); err != nil {
131-
errs = append(errs, err)
117+
dsr.sources.Range(func(_, val interface{}) bool {
118+
if ds, ok := val.(DataSource); ok {
119+
if err := ds.AddEndpoint(ep); err != nil {
120+
errs = append(errs, err)
121+
}
132122
}
133-
}
123+
return true
124+
})
134125
return errors.Join(errs...)
135126
}
136127

@@ -142,15 +133,15 @@ func (dsr *DataSourceRegistry) RemoveEndpoint(ep Endpoint) error {
142133
return nil
143134
}
144135

145-
dsr.mu.RLock()
146-
defer dsr.mu.RUnlock()
147136
errs := []error{}
148-
149-
for _, ds := range dsr.sources {
150-
if err := ds.RemoveEndpoint(ep); err != nil {
151-
errs = append(errs, err)
137+
dsr.sources.Range(func(_, val interface{}) bool {
138+
if ds, ok := val.(DataSource); ok {
139+
if err := ds.RemoveEndpoint(ep); err != nil {
140+
errs = append(errs, err)
141+
}
152142
}
153-
}
143+
return true
144+
})
154145
return errors.Join(errs...)
155146
}
156147

0 commit comments

Comments
 (0)