@@ -17,59 +17,56 @@ limitations under the License.
17
17
package datalayer
18
18
19
19
import (
20
- "context"
21
20
"errors"
22
21
"fmt"
23
22
"reflect"
24
23
"sync"
25
24
)
26
25
27
- // DataSource is an interface required from all datalayer data collection
26
+ // DataSource is an interface required from all data layer data collection
28
27
// sources.
29
28
type DataSource interface {
30
29
// Name returns the name of this datasource.
31
30
Name () string
32
31
33
- // Start begins the collection process.
34
- Start (ctx context.Context ) error
35
-
36
- // Stop stops the collection process.
37
- Stop ()
38
-
39
32
// AddExtractor adds an extractor to the data source.
40
- // The extractor will be called whenever the data source might
33
+ // The extractor will be called whenever the Collector might
41
34
// have some new raw information regarding an endpoint.
42
35
// The Extractor's expected input type should be validated against
43
- // the data source output type upon registration.
36
+ // the data source's output type upon registration.
44
37
AddExtractor (extractor Extractor ) error
45
38
46
- // AddEndpoint adds an endpoint to collect from.
47
- AddEndpoint (ep Endpoint ) error
48
-
49
- // RemoveEndpoint removes an endpoint from collection.
50
- RemoveEndpoint (ep Endpoint ) error
39
+ // Collect is triggered by the data layer framework to fetch potentially new
40
+ // data for an endpoint. It passes retrieved data to registered Extractors.
41
+ Collect (ep Endpoint )
51
42
}
52
43
44
+ // Extractor is used to convert raw data into relevant data layer information
45
+ // for an endpoint. They are called by data sources whenever new data might be
46
+ // be available. Multiple Extractors can be registered with a source. Extractors
47
+ // are expected to save their output with an endpoint so it becomes accessible
48
+ // to consumers in other subsystem of the inference gateway (e.g., when making
49
+ // scheduling decisions).
53
50
type Extractor interface {
54
51
// Name returns the name of the extractor.
55
52
Name () string
56
53
57
54
// ExpectedType defines the type expected by the extractor. It must match
58
- // the DataSource.OutputType() the extractor registers for .
59
- ExpectedType () reflect.Type
55
+ // the output type of the data source where the extractor is registered .
56
+ ExpectedInputType () reflect.Type
60
57
61
58
// Extract transforms the data source output into a concrete attribute that
62
59
// is stored on the given endpoint.
63
60
Extract (data any , ep Endpoint )
64
61
}
65
62
66
63
var (
67
- // DefaultDataSources is the system default data source registry.
68
- DefaultDataSources = DataSourceRegistry {}
64
+ // defaultDataSources is the system default data source registry.
65
+ defaultDataSources = DataSourceRegistry {}
69
66
)
70
67
71
68
// DataSourceRegistry stores named data sources and makes them
72
- // accessible to GIE subsystems.
69
+ // accessible to other subsystems in the inference gateway .
73
70
type DataSourceRegistry struct {
74
71
sources sync.Map
75
72
}
@@ -101,70 +98,32 @@ func (dsr *DataSourceRegistry) GetNamedSource(name string) (DataSource, bool) {
101
98
return nil , false
102
99
}
103
100
104
- // AddEndpoint adds a new endpoint to all registered sources.
105
- // Endpoints are not tracked and DataSources are only notified of
106
- // endpoints added after the data source has been registered.
107
- //
108
- // TODO: track endpoints and update on later source registrations? It seems safe
109
- // to assume that all sources are registered before endpoints are
110
- // discovered and added to the system.
111
- func (dsr * DataSourceRegistry ) AddEndpoint (ep Endpoint ) error {
112
- if ep == nil {
113
- return nil
114
- }
115
-
116
- errs := []error {}
117
- dsr .sources .Range (func (_ , val interface {}) bool {
118
- if ds , ok := val .(DataSource ); ok {
119
- if err := ds .AddEndpoint (ep ); err != nil {
120
- errs = append (errs , err )
121
- }
122
- }
123
- return true
124
- })
125
- return errors .Join (errs ... )
126
- }
127
-
128
- // RemoveEndpoint removes an endpoint from all registered sources.
129
- // A source may be called to remove an endpoint it has not added - this
130
- // is should not result in an error.
131
- func (dsr * DataSourceRegistry ) RemoveEndpoint (ep Endpoint ) error {
132
- if ep == nil {
133
- return nil
134
- }
135
-
136
- errs := []error {}
137
- dsr .sources .Range (func (_ , val interface {}) bool {
101
+ // GetSources returns all sources registered.
102
+ func (dsr * DataSourceRegistry ) GetSources () []DataSource {
103
+ sources := []DataSource {}
104
+ dsr .sources .Range (func (_ , val any ) bool {
138
105
if ds , ok := val .(DataSource ); ok {
139
- if err := ds .RemoveEndpoint (ep ); err != nil {
140
- errs = append (errs , err )
141
- }
106
+ sources = append (sources , ds )
142
107
}
143
- return true
108
+ return true // continue iteration
144
109
})
145
- return errors . Join ( errs ... )
110
+ return sources
146
111
}
147
112
148
113
// RegisterSource adds the data source to the default registry.
149
114
func RegisterSource (src DataSource ) error {
150
- return DefaultDataSources .Register (src )
115
+ return defaultDataSources .Register (src )
151
116
}
152
117
153
118
// GetNamedSource returns the named source from the default registry,
154
119
// if found.
155
120
func GetNamedSource (name string ) (DataSource , bool ) {
156
- return DefaultDataSources .GetNamedSource (name )
157
- }
158
-
159
- // AddEndpoint adds an endpoint to all sources in the default source registry.
160
- func AddEndpoint (ep Endpoint ) error {
161
- return DefaultDataSources .AddEndpoint (ep )
121
+ return defaultDataSources .GetNamedSource (name )
162
122
}
163
123
164
- // RemoveEndpoint removes an endpoint from all sources in the default source
165
- // registry.
166
- func RemoveEndpoint (ep Endpoint ) error {
167
- return DefaultDataSources .RemoveEndpoint (ep )
124
+ // GetSources returns all sources in the default registry.
125
+ func GetSources () []DataSource {
126
+ return defaultDataSources .GetSources ()
168
127
}
169
128
170
129
// ValidateExtractorType checks if an extractor can handle
0 commit comments