Skip to content

(feat) initial types and interfaces for pluggable data layer #1154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions pkg/epp/datalayer/attributemap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package datalayer

import (
"sync"
)

// Cloneable types support cloning of the value.
type Cloneable interface {
Clone() Cloneable
}

// AttributeMap is used to store flexible metadata or traits
// across different aspects of an inference server.
// Stored values must be Cloneable.
type AttributeMap interface {
Put(string, Cloneable)
Get(string) (Cloneable, bool)
Keys() []string
}

// Attributes provides a goroutine safe implementation of AttributeMap.
type Attributes struct {
data sync.Map
}

// NewAttributes return a new attribute map instance.
func NewAttributes() *Attributes {
return &Attributes{
data: sync.Map{},
}
}

// Put adds (or updates) an attribute in the map.
func (a *Attributes) Put(key string, value Cloneable) {
a.data.Store(key, value) // TODO: Clone into map?
}

// Get returns an attribute from the map.
func (a *Attributes) Get(key string) (Cloneable, bool) {
val, ok := a.data.Load(key)
if !ok {
return nil, false
}
if cloneable, ok := val.(Cloneable); ok {
return cloneable.Clone(), true
}
return nil, false // shouldn't happen since Put accepts Cloneables only
}

// Keys returns an array of all the names of attributes stored in the map.
func (a *Attributes) Keys() []string {
keys := []string{}
a.data.Range(func(key, _ any) bool {
if k, ok := key.(string); ok {
keys = append(keys, k)
}
return true // continue iteration
})
return keys
}

// Clone the attributes object itself.
func (a *Attributes) Clone() *Attributes {
cloned := &Attributes{
data: sync.Map{},
}

a.data.Range(func(k, v interface{}) bool {
cloned.data.Store(k, v)
return true
})
return cloned
}
148 changes: 148 additions & 0 deletions pkg/epp/datalayer/datasource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package datalayer

import (
"errors"
"fmt"
"reflect"
"sync"
)

// DataSource is an interface required from all data layer data collection
// sources.
type DataSource interface {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implies that each datasource will have it's own list of endpoints, that all need to be kept up to date, rather than feeding into a centralized list of endpoints, just with its custom data included.

OOC, why did we choose this path?

Copy link
Contributor Author

@elevran elevran Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two possible design options. I'm fine with either so if there's a strong preference one way or another, am happy to change it.

  1. data sources keep references to endpoints (e.g., in the case of /metrics, it needs the IP address and a way to set metrics on the endpoint itself). This is what I had in mind the rationale for Add/RemoveEndpoint.
  2. endpoints keep references (directly or indirectly) to datasources (see bullets below).

I was thinking of the first option as more flexible (e.g., a data source that records node name for each endpoint does not need to be continuously called for the endpoint... the data source would not need to create a go routine for this endpoint at all), but engineering wise both work.

If we prefer a model where there is explicitly one go routine per endpoint (possible downsides: head of line blocking by "slow/failing" data sources impact collection by others, all data sources are applicable to all endpoints, etc.), the design would change as follows:

  • data store own the go routines and calls start/stop when endpoints are added and removed. There is a "collection context/state" stored in datastore per endpoint and there should not be a start/stop RefreshLoop method defined on endpoints - they're "pure data structures".
  • on every "tick", the go routine code calls all data sources to CollectFor(ep). Internally the data source can still call multiple extractors, so the main change is removal of Start/Stop methods and addition of CollectFor to data source.

WDYT? The second option (go routine per endpoint calling all data sources passing each the endpoint) is possibly closer to how it is done in GIE today so can continue along that path.

Copy link
Contributor Author

@elevran elevran Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kfswain @nirrozenbaum this is a schematic snippet of what the second option may look like:

type Collector interface {
  Name() string
  CollectFor(Endpoint) error
  AddExtractor(Extractor)
}

type Extractor interface {
   Name() string
   ExpectedType() reflect.Type
   Extract(interface{}, Endpoint)
 }
 
 // when data store is notified of new Pod, it starts a go routine for it (there a scraping context that saves a reference to the Endpoint, cancellation contexts, etc)
 func (dlc *CollectionState) CollectionCycle() {
    ep := dlc.Endpoint()
    for _, c := range datalayer.Collectors() { // or we can store collectors on the collection state directly
      if err := c.CollectFor(ep); err != nil { 
         // handle failures - such as exponential backoff, disabling collectors after some time...
      } 
      // if successful, collector has called extractors with ep for updating
   }
 }

Let me know if this seems more "natural/aligned".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed so DataSource is no longer informed of and tracks endpoints.

Copy link
Collaborator

@kfswain kfswain Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm with you here. The second option, from my perspective, separates concerns more cleanly:

  • Let the datastore focus on the actual collection mechanism. It does potentially create head of line blocking, agreed. But this seems acceptable with proper timeouts/backoff.
  • the Pod controller remains the single source of truth wrt endpoints and just needs to update the datastore endpoint list
  • Custom endpoints just focus on what data they care about, and how to get it

// Name returns the name of this datasource.
Name() string

// AddExtractor adds an extractor to the data source.
// The extractor will be called whenever the Collector might
// have some new raw information regarding an endpoint.
// The Extractor's expected input type should be validated against
// the data source's output type upon registration.
AddExtractor(extractor Extractor) error

// Collect is triggered by the data layer framework to fetch potentially new
// data for an endpoint. It passes retrieved data to registered Extractors.
Collect(ep Endpoint)
}

// Extractor is used to convert raw data into relevant data layer information
// for an endpoint. They are called by data sources whenever new data might be
// available. Multiple Extractors can be registered with a source. Extractors
// are expected to save their output with an endpoint so it becomes accessible
// to consumers in other subsystem of the inference gateway (e.g., when making
// scheduling decisions).
type Extractor interface {
// Name returns the name of the extractor.
Name() string

// ExpectedType defines the type expected by the extractor. It must match
// the output type of the data source where the extractor is registered.
ExpectedInputType() reflect.Type

// Extract transforms the data source output into a concrete attribute that
// is stored on the given endpoint.
Extract(data any, ep Endpoint)
}

var (
// defaultDataSources is the system default data source registry.
defaultDataSources = DataSourceRegistry{}
)

// DataSourceRegistry stores named data sources and makes them
// accessible to other subsystems in the inference gateway.
type DataSourceRegistry struct {
sources sync.Map
}

// Register adds a source to the registry.
func (dsr *DataSourceRegistry) Register(src DataSource) error {
if src == nil {
return errors.New("unable to register a nil data source")
}

if _, found := dsr.sources.Load(src.Name()); found {
return fmt.Errorf("unable to register duplicate data source: %s", src.Name())
}
dsr.sources.Store(src.Name(), src)
return nil
}

// GetNamedSource returns the named data source, if found.
func (dsr *DataSourceRegistry) GetNamedSource(name string) (DataSource, bool) {
if name == "" {
return nil, false
}

if val, found := dsr.sources.Load(name); found {
if ds, ok := val.(DataSource); ok {
return ds, true
} // ignore type assertion failures and fall through
}
return nil, false
}

// GetSources returns all sources registered.
func (dsr *DataSourceRegistry) GetSources() []DataSource {
sources := []DataSource{}
dsr.sources.Range(func(_, val any) bool {
if ds, ok := val.(DataSource); ok {
sources = append(sources, ds)
}
return true // continue iteration
})
return sources
}

// RegisterSource adds the data source to the default registry.
func RegisterSource(src DataSource) error {
return defaultDataSources.Register(src)
}

// GetNamedSource returns the named source from the default registry,
// if found.
func GetNamedSource(name string) (DataSource, bool) {
return defaultDataSources.GetNamedSource(name)
}

// GetSources returns all sources in the default registry.
func GetSources() []DataSource {
return defaultDataSources.GetSources()
}

// ValidateExtractorType checks if an extractor can handle
// the collector's output.
func ValidateExtractorType(collectorOutputType, extractorInputType reflect.Type) error {
if collectorOutputType == extractorInputType {
return nil
}

// extractor accepts anything (i.e., interface{})
if extractorInputType.Kind() == reflect.Interface && extractorInputType.NumMethod() == 0 {
return nil
}

// check if collector output implements extractor input interface
if collectorOutputType.Implements(extractorInputType) {
return nil
}

return fmt.Errorf("extractor input type %v cannot handle collector output type %v",
extractorInputType, collectorOutputType)
}
40 changes: 40 additions & 0 deletions pkg/epp/datalayer/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package datalayer

import (
corev1 "k8s.io/api/core/v1"
)

// EndpointPodState allows management of the Pod related attributes.
type EndpointPodState interface {
GetPod() *PodInfo
UpdatePod(*corev1.Pod)
}

// EndpointMetricsState allows management of the Metrics related attributes.
type EndpointMetricsState interface {
GetMetrics() *Metrics
UpdateMetrics(*Metrics)
}

// Endpoint represents an inference serving endpoint and its related attributes.
type Endpoint interface {
EndpointPodState
EndpointMetricsState
AttributeMap
}
80 changes: 80 additions & 0 deletions pkg/epp/datalayer/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package datalayer

import (
"fmt"
"time"
)

// Metrics holds the latest metrics snapshot scraped from a pod.
type Metrics struct {
// ActiveModels is a set of models(including LoRA adapters) that are currently cached to GPU.
ActiveModels map[string]int
WaitingModels map[string]int
// MaxActiveModels is the maximum number of models that can be loaded to GPU.
MaxActiveModels int
RunningQueueSize int
WaitingQueueSize int
KVCacheUsagePercent float64
KvCacheMaxTokenCapacity int

// UpdateTime records the last time when the metrics were updated.
UpdateTime time.Time
}

// NewMetrics initializes a new empty Metrics object.
func NewMetrics() *Metrics {
return &Metrics{
ActiveModels: make(map[string]int),
WaitingModels: make(map[string]int),
}
}

// String returns a string with all Metric information
func (m *Metrics) String() string {
if m == nil {
return ""
}
return fmt.Sprintf("%+v", *m)
}

// Clone creates a copy of Metrics and returns its pointer.
// Clone returns nil if the object being cloned is nil.
func (m *Metrics) Clone() *Metrics {
if m == nil {
return nil
}
activeModels := make(map[string]int, len(m.ActiveModels))
for key, value := range m.ActiveModels {
activeModels[key] = value
}
waitingModels := make(map[string]int, len(m.WaitingModels))
for key, value := range m.WaitingModels {
waitingModels[key] = value
}
return &Metrics{
ActiveModels: activeModels,
WaitingModels: waitingModels,
MaxActiveModels: m.MaxActiveModels,
RunningQueueSize: m.RunningQueueSize,
WaitingQueueSize: m.WaitingQueueSize,
KVCacheUsagePercent: m.KVCacheUsagePercent,
KvCacheMaxTokenCapacity: m.KvCacheMaxTokenCapacity,
UpdateTime: m.UpdateTime,
}
}
Loading