Skip to content

Feature/expose configuration error #3205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/moby/swarmkit/v2/manager/orchestrator/volumeenforcer"
"github.com/moby/swarmkit/v2/manager/resourceapi"
"github.com/moby/swarmkit/v2/manager/scheduler"
"github.com/moby/swarmkit/v2/manager/scheduler/common"
"github.com/moby/swarmkit/v2/manager/state/raft"
"github.com/moby/swarmkit/v2/manager/state/raft/transport"
"github.com/moby/swarmkit/v2/manager/state/store"
Expand Down
40 changes: 40 additions & 0 deletions manager/scheduler/common/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package common

import (
"github.com/moby/swarmkit/v2/api"
)

// NodeInfo contains a node and its available resources
type NodeInfo struct {
Node *api.Node
Tasks map[string]*api.Task
AvailableResources *api.Resources
// Simplified version of NodeInfo that avoids circular dependencies
}

// NewNodeInfo creates a new NodeInfo instance
func NewNodeInfo(node *api.Node, tasks map[string]*api.Task, resources *api.Resources) NodeInfo {
return NodeInfo{
Node: node,
Tasks: tasks,
AvailableResources: resources,
}
}

// PluginInterface defines the scheduler plugin interface in a common package to avoid import cycles
type PluginInterface interface {
// Name returns the name of the plugin
Name() string
// Schedule allows the plugin to select a node for the task
Schedule(task *api.Task, nodeSet []NodeInfo) (NodeInfo, error)
}

// PluginScheduler defines the interface for plugin management
type PluginScheduler interface {
// RegisterPlugin registers a plugin with the scheduler
RegisterPlugin(p PluginInterface) error
// Schedule runs the scheduling process using the specified plugin
Schedule(pluginName string, task *api.Task, nodes []NodeInfo) (NodeInfo, error)
// GetPlugin returns a plugin by name
GetPlugin(name string) PluginInterface
}
35 changes: 34 additions & 1 deletion manager/scheduler/nodeinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/api/genericresource"
"github.com/moby/swarmkit/v2/log"
"github.com/moby/swarmkit/v2/manager/scheduler/common"
)

// hostPortSpec specifies a used host port.
Expand Down Expand Up @@ -126,7 +127,6 @@ func (nodeInfo *NodeInfo) addTask(t *api.Task) bool {

reservations := taskReservations(t.Spec)
resources := nodeInfo.AvailableResources

resources.MemoryBytes -= reservations.MemoryBytes
resources.NanoCPUs -= reservations.NanoCPUs

Expand Down Expand Up @@ -219,3 +219,36 @@ func (nodeInfo *NodeInfo) countRecentFailures(now time.Time, t *api.Task) int {

return recentFailureCount
}

// ToCommon converts the internal NodeInfo to common.NodeInfo
func (nodeInfo *NodeInfo) ToCommon() common.NodeInfo {
return common.NodeInfo{
Node: nodeInfo.Node,
Tasks: nodeInfo.Tasks,
AvailableResources: nodeInfo.AvailableResources,
}
}

// FromCommon converts a common.NodeInfo to the internal NodeInfo
func FromCommon(commonInfo common.NodeInfo) NodeInfo {
nodeInfo := NodeInfo{
Node: commonInfo.Node,
Tasks: commonInfo.Tasks,
AvailableResources: commonInfo.AvailableResources,
ActiveTasksCount: 0,
ActiveTasksCountByService: make(map[string]int),
usedHostPorts: make(map[hostPortSpec]struct{}),
recentFailures: make(map[versionedService][]time.Time),
lastCleanup: time.Now(),
}

// Count active tasks
for _, t := range commonInfo.Tasks {
if t.DesiredState <= api.TaskStateCompleted {
nodeInfo.ActiveTasksCount++
nodeInfo.ActiveTasksCountByService[t.ServiceID]++
}
}

return nodeInfo
}
39 changes: 39 additions & 0 deletions manager/scheduler/placement.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package scheduler

import (
"sync"

"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/manager/scheduler/common"
)

// PlacementPlugin defines the interface for external placement plugins.
type PlacementPlugin interface {
// Name returns the name of the plugin.
Name() string
// FilterNodes allows the plugin to filter and rank nodes for a given task.
FilterNodes(task *api.Task, nodes []common.NodeInfo) ([]common.NodeInfo, error)
}

var (
plugins = make(map[string]common.PluginInterface)
pluginsLock sync.RWMutex
)

// RegisterPlacementPlugin registers a placement plugin for use by the scheduler
func RegisterPlacementPlugin(p common.PluginInterface) {
if p == nil {
return
}
pluginsLock.Lock()
defer pluginsLock.Unlock()
plugins[p.Name()] = p
}

// GetPlacementPlugin returns a placement plugin by name, or nil if no plugin
// with the given name is registered.
func GetPlacementPlugin(name string) common.PluginInterface {
pluginsLock.RLock()
defer pluginsLock.RUnlock()
return plugins[name]
}
30 changes: 30 additions & 0 deletions manager/scheduler/pluginscheduler/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package pluginscheduler

import (
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/manager/scheduler/common"
)

// Plugin represents a scheduler plugin
type Plugin struct {
name string
scheduler common.PluginInterface
}

// NewPlugin creates a new scheduler plugin
func NewPlugin(name string, scheduler common.PluginInterface) *Plugin {
return &Plugin{
name: name,
scheduler: scheduler,
}
}

// Name returns the name of the plugin
func (p *Plugin) Name() string {
return p.name
}

// Schedule allows the plugin to select a node for the task
func (p *Plugin) Schedule(task *api.Task, nodeSet []common.NodeInfo) (common.NodeInfo, error) {
return p.scheduler.Schedule(task, nodeSet)
}
47 changes: 47 additions & 0 deletions manager/scheduler/pluginscheduler/pluginscheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package pluginscheduler

import (
"fmt"

"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/manager/scheduler/common"
)

// Scheduler handles scheduling using plugins
type Scheduler struct {
plugins map[string]common.PluginInterface
}

// New creates a new plugin scheduler
func New() *Scheduler {
return &Scheduler{
plugins: make(map[string]common.PluginInterface),
}
}

// RegisterPlugin registers a plugin with the scheduler
func (s *Scheduler) RegisterPlugin(p common.PluginInterface) error {
if p == nil {
return fmt.Errorf("cannot register nil plugin")
}
name := p.Name()
if name == "" {
return fmt.Errorf("plugin name cannot be empty")
}
s.plugins[name] = p
return nil
}

// Schedule runs the scheduling process using the specified plugin
func (s *Scheduler) Schedule(pluginName string, task *api.Task, nodes []common.NodeInfo) (common.NodeInfo, error) {
p, exists := s.plugins[pluginName]
if !exists {
return common.NodeInfo{}, fmt.Errorf("plugin %s not found", pluginName)
}
return p.Schedule(task, nodes)
}

// GetPlugin returns a plugin by name
func (s *Scheduler) GetPlugin(name string) common.PluginInterface {
return s.plugins[name]
}
41 changes: 41 additions & 0 deletions manager/scheduler/pluginwrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package scheduler

import (
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/manager/scheduler/common"
)

// PluginAdapter adapts a PlacementPlugin to the common.PluginInterface
type PluginAdapter struct {
plugin PlacementPlugin
name string
}

// NewPluginAdapter creates a new adapter for a placement plugin
func NewPluginAdapter(name string, plugin PlacementPlugin) *PluginAdapter {
return &PluginAdapter{
plugin: plugin,
name: name,
}
}

// Name returns the plugin name
func (p *PluginAdapter) Name() string {
return p.name
}

// Schedule implements the common.PluginInterface
func (p *PluginAdapter) Schedule(task *api.Task, nodes []common.NodeInfo) (common.NodeInfo, error) {
// Call the underlying plugin
filtered, err := p.plugin.FilterNodes(task, nodes)
if err != nil {
return common.NodeInfo{}, err
}

// If we have results, return the first one
if len(filtered) > 0 {
return filtered[0], nil
}

return common.NodeInfo{}, nil
}
Loading