Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,8 @@ spec:
- --pprof-port=$(PPROF_PORT)
- --pprof-block-rate=$(PPROF_BLOCK_RATE)
- --pprof-mutex-rate=$(PPROF_MUTEX_RATE)
- --retry-creating-failed-pipelines-tick=$(RETRY_CREATING_FAILED_PIPELINES_TICK)
- --retry-deleting-failed-pipelines-tick=$(RETRY_DELETING_FAILED_PIPELINES_TICK)
command:
- /bin/scheduler
env:
Expand Down Expand Up @@ -611,6 +613,10 @@ spec:
value: "0"
- name: PPROF_MUTEX_RATE
value: "0"
- name: RETRY_CREATING_FAILED_PIPELINES_TICK
value: 60s
- name: RETRY_DELETING_FAILED_PIPELINES_TICK
value: 60s
image: '{{ .Values.scheduler.image.registry }}/{{ .Values.scheduler.image.repository
}}:{{ .Values.scheduler.image.tag }}'
imagePullPolicy: '{{ .Values.scheduler.image.pullPolicy }}'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,8 @@ spec:
- --pprof-port=$(PPROF_PORT)
- --pprof-block-rate=$(PPROF_BLOCK_RATE)
- --pprof-mutex-rate=$(PPROF_MUTEX_RATE)
- --retry-creating-failed-pipelines-tick=$(RETRY_CREATING_FAILED_PIPELINES_TICK)
- --retry-deleting-failed-pipelines-tick=$(RETRY_DELETING_FAILED_PIPELINES_TICK)
command:
- /bin/scheduler
env:
Expand Down Expand Up @@ -611,6 +613,10 @@ spec:
value: "0"
- name: PPROF_MUTEX_RATE
value: "0"
- name: RETRY_CREATING_FAILED_PIPELINES_TICK
value: 60s
- name: RETRY_DELETING_FAILED_PIPELINES_TICK
value: 60s
image: '{{ .Values.scheduler.image.registry }}/{{ .Values.scheduler.image.repository
}}:{{ .Values.scheduler.image.tag }}'
imagePullPolicy: '{{ .Values.scheduler.image.pullPolicy }}'
Expand Down
6 changes: 6 additions & 0 deletions k8s/yaml/components.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ spec:
- --pprof-port=$(PPROF_PORT)
- --pprof-block-rate=$(PPROF_BLOCK_RATE)
- --pprof-mutex-rate=$(PPROF_MUTEX_RATE)
- --retry-creating-failed-pipelines-tick=$(RETRY_CREATING_FAILED_PIPELINES_TICK)
- --retry-deleting-failed-pipelines-tick=$(RETRY_DELETING_FAILED_PIPELINES_TICK)
command:
- /bin/scheduler
env:
Expand Down Expand Up @@ -454,6 +456,10 @@ spec:
value: "0"
- name: PPROF_MUTEX_RATE
value: "0"
- name: RETRY_CREATING_FAILED_PIPELINES_TICK
value: 60s
- name: RETRY_DELETING_FAILED_PIPELINES_TICK
value: 60s
image: 'docker.io/seldonio/seldon-scheduler:latest'
imagePullPolicy: 'IfNotPresent'
livenessProbe:
Expand Down
6 changes: 6 additions & 0 deletions operator/config/seldonconfigs/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ spec:
- --pprof-port=$(PPROF_PORT)
- --pprof-block-rate=$(PPROF_BLOCK_RATE)
- --pprof-mutex-rate=$(PPROF_MUTEX_RATE)
- --retry-creating-failed-pipelines-tick=$(RETRY_CREATING_FAILED_PIPELINES_TICK)
- --retry-deleting-failed-pipelines-tick=$(RETRY_DELETING_FAILED_PIPELINES_TICK)
command:
- /bin/scheduler
env:
Expand Down Expand Up @@ -386,6 +388,10 @@ spec:
value: "0"
- name: PPROF_MUTEX_RATE
value: "0"
- name: RETRY_CREATING_FAILED_PIPELINES_TICK
value: "60s"
- name: RETRY_DELETING_FAILED_PIPELINES_TICK
value: "60s"
image: seldonio/seldon-scheduler:latest
imagePullPolicy: Always
name: scheduler
Expand Down
76 changes: 43 additions & 33 deletions scheduler/cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,37 +51,39 @@ import (
)

var (
envoyPort uint
agentPort uint
agentMtlsPort uint
schedulerPort uint
schedulerMtlsPort uint
chainerPort uint
healthProbePort uint
namespace string
pipelineGatewayHost string
pipelineGatewayHttpPort int
pipelineGatewayGrpcPort int
logLevel string
tracingConfigPath string
dbPath string
nodeID string
allowPlaintxt bool // scheduler server
autoscalingModelEnabled bool
autoscalingServerEnabled bool
kafkaConfigPath string
scalingConfigPath string
schedulerReadyTimeoutSeconds uint
deletedResourceTTLSeconds uint
serverPackingEnabled bool
serverPackingPercentage float64
accessLogPath string
enableAccessLog bool
includeSuccessfulRequests bool
enablePprof bool
pprofPort int
pprofMutexRate int
pprofBlockRate int
envoyPort uint
agentPort uint
agentMtlsPort uint
schedulerPort uint
schedulerMtlsPort uint
chainerPort uint
healthProbePort uint
namespace string
pipelineGatewayHost string
pipelineGatewayHttpPort int
pipelineGatewayGrpcPort int
logLevel string
tracingConfigPath string
dbPath string
nodeID string
allowPlaintxt bool // scheduler server
autoscalingModelEnabled bool
autoscalingServerEnabled bool
kafkaConfigPath string
scalingConfigPath string
schedulerReadyTimeoutSeconds uint
deletedResourceTTLSeconds uint
serverPackingEnabled bool
serverPackingPercentage float64
accessLogPath string
enableAccessLog bool
includeSuccessfulRequests bool
enablePprof bool
pprofPort int
pprofMutexRate int
pprofBlockRate int
retryFailedCreatingPipelinesTick time.Duration
retryFailedDeletePipelinesTick time.Duration
)

const (
Expand Down Expand Up @@ -172,6 +174,10 @@ func init() {
flag.IntVar(&pprofPort, "pprof-port", 6060, "pprof HTTP server port")
flag.IntVar(&pprofBlockRate, "pprof-block-rate", 0, "pprof block rate")
flag.IntVar(&pprofMutexRate, "pprof-mutex-rate", 0, "pprof mutex rate")

// frequency to retry creating/deleting pipelines which failed to create/delete
flag.DurationVar(&retryFailedCreatingPipelinesTick, "retry-creating-failed-pipelines-tick", time.Minute, "tick interval for re-attempting to create pipelines which failed to create")
flag.DurationVar(&retryFailedDeletePipelinesTick, "retry-deleting-failed-pipelines-tick", time.Minute, "tick interval for re-attempting to delete pipelines which failed to terminate")
}

func getNamespace() string {
Expand Down Expand Up @@ -322,8 +328,11 @@ func main() {
logger.WithError(err).Fatal("Failed to start data engine chainer server")
}
defer cs.Stop()

ctx, stopPipelinePollers := context.WithCancel(context.Background())
defer stopPipelinePollers()
go func() {
err := cs.StartGrpcServer(chainerPort)
err := cs.StartGrpcServer(ctx, retryFailedCreatingPipelinesTick, retryFailedDeletePipelinesTick, chainerPort)
if err != nil {
log.WithError(err).Fatalf("Chainer server start error")
}
Expand Down Expand Up @@ -382,7 +391,7 @@ func main() {
)
defer s.Stop()

err = s.StartGrpcServers(allowPlaintxt, schedulerPort, schedulerMtlsPort)
err = s.StartGrpcServers(ctx, allowPlaintxt, schedulerPort, schedulerMtlsPort, retryFailedCreatingPipelinesTick, retryFailedDeletePipelinesTick)
if err != nil {
logger.WithError(err).Fatal("Failed to start server gRPC servers")
}
Expand Down Expand Up @@ -421,6 +430,7 @@ func main() {
s.StopSendServerEvents()
s.StopSendExperimentEvents()
s.StopSendPipelineEvents()
stopPipelinePollers()
s.StopSendControlPlaneEvents()
as.StopAgentStreams()

Expand Down
5 changes: 4 additions & 1 deletion scheduler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,10 @@ require (
sigs.k8s.io/yaml v1.5.0 // indirect
)

tool go.uber.org/mock/mockgen
tool (
go.uber.org/mock/mockgen
golang.org/x/tools/cmd/stringer
)

replace github.com/seldonio/seldon-core/components/tls/v2 => ../components/tls

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ func GetPipelineStatus(
messageStr += fmt.Sprintf("%d/%d failed ", failedCount, len(streams))
}

failedTerminatingCount := cr.GetCountResourceWithStatus(pipelineName, pipeline.PipelineFailedTerminating)
if failedTerminatingCount > 0 {
messageStr += fmt.Sprintf("%d/%d failed ", failedTerminatingCount, len(streams))
}

rebalancingCount := cr.GetCountResourceWithStatus(pipelineName, pipeline.PipelineRebalancing)
if rebalancingCount > 0 {
messageStr += fmt.Sprintf("%d/%d rebalancing ", rebalancingCount, len(streams))
Expand All @@ -170,8 +175,8 @@ func GetPipelineStatus(
}

if message.Update.Op == chainer.PipelineUpdateMessage_Delete {
if failedCount > 0 {
return pipeline.PipelineFailed, messageStr
if failedTerminatingCount > 0 {
return pipeline.PipelineFailedTerminating, messageStr
}
if terminatedCount == len(streams) {
return pipeline.PipelineTerminated, messageStr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,9 @@ func TestGetPipelineStatus(t *testing.T) {
name: "delete failed",
op: chainer.PipelineUpdateMessage_Delete,
statuses: map[string]pipeline.PipelineStatus{
"a": pipeline.PipelineFailed,
"a": pipeline.PipelineFailedTerminating,
},
expected: pipeline.PipelineFailed,
expected: pipeline.PipelineFailedTerminating,
},
{
name: "rebalance failed",
Expand Down
Loading
Loading