Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions controller/expectations/cache_expectation.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func NewxCacheExpectations(reader client.Reader, scheme *runtime.Scheme, clock c
}
}

func (r *CacheExpectations) CreateExpectations(controllerKey string) (*CacheExpectation, error) {
return r.initExpectations(controllerKey)
}

// GetExpectations returns the ControlleeExpectations of the given controller.
func (r *CacheExpectations) GetExpectations(controllerKey string) (*CacheExpectation, bool, error) {
exp, exists, err := r.GetByKey(controllerKey)
Expand Down Expand Up @@ -182,6 +186,16 @@ func (e *CacheExpectation) Fulfilled() bool {
return satisfied
}

func (e *CacheExpectation) FulFilledFor(gvk schema.GroupVersionKind, namespace, name string) bool {
key := e.getKey(gvk, namespace, name)
item, ok, err := e.items.GetByKey(key)
if err != nil || !ok {
return true
}
eitem := item.(*CacheExpectationItem)
return eitem.Fulfilled()
}

func (e *CacheExpectation) ExpectCreation(gvk schema.GroupVersionKind, namespace, name string) error {
return e.expect(e.getKey(gvk, namespace, name), e.creationObserved(gvk, namespace, name))
}
Expand Down
56 changes: 56 additions & 0 deletions controller/merge/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2024-2025 KusionStack Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package merge

import (
"encoding/json"

"k8s.io/apimachinery/pkg/util/strategicpatch"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// ThreeWayMergeToTarget Use three-way merge to get a updated instance.
func ThreeWayMergeToTarget(currentRevisionTarget, updateRevisionTarget, currentTarget, emptyObj client.Object) error {
currentRevisionTargetBytes, err := json.Marshal(currentRevisionTarget)
if err != nil {
return err
}
updateRevisionTargetBytes, err := json.Marshal(updateRevisionTarget)
if err != nil {
return err
}

// 1. find the extra changes based on current revision
patch, err := strategicpatch.CreateTwoWayMergePatch(currentRevisionTargetBytes, updateRevisionTargetBytes, emptyObj)
if err != nil {
return err
}

// 2. apply above changes to current target object
// We don't apply the diff between currentTarget and currentRevisionTarget to updateRevisionTarget,
// because the TargetTemplate changes should have the highest priority.
currentTargetBytes, err := json.Marshal(currentTarget)
if err != nil {
return err
}
if updateRevisionTargetBytes, err = strategicpatch.StrategicMergePatch(currentTargetBytes, patch, emptyObj); err != nil {
return err
}

err = json.Unmarshal(updateRevisionTargetBytes, currentTarget)
return err
}
74 changes: 74 additions & 0 deletions controller/utils/slow_start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright 2024-2025 The KusionStack Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import "sync"

const (
SlowStartInitialBatchSize = 1
)

func intMin(l, r int) int {
if l < r {
return l
}

return r
}

// SlowStartBatch tries to call the provided function a total of 'count' times,
// starting slow to check for errors, then speeding up if calls succeed.
//
// It groups the calls into batches, starting with a group of initialBatchSize.
// Within each batch, it may call the function multiple times concurrently.
//
// If a whole batch succeeds, the next batch may get exponentially larger.
// If there are any failures in a batch, all remaining batches are skipped
// after waiting for the current batch to complete.
//
// It returns the number of successful calls to the function.
func SlowStartBatch(count, initialBatchSize int, shortCircuit bool, fn func(int, error) error) (int, error) {
remaining := count
successes := 0
index := 0
var gotErr error
for batchSize := intMin(remaining, initialBatchSize); batchSize > 0; batchSize = intMin(2*batchSize, remaining) {
errCh := make(chan error, batchSize)
var wg sync.WaitGroup
wg.Add(batchSize)
for i := 0; i < batchSize; i++ {
go func(index int) {
defer wg.Done()
if err := fn(index, gotErr); err != nil {
errCh <- err
}
}(index)
index++
}
wg.Wait()
curSuccesses := batchSize - len(errCh)
successes += curSuccesses
if len(errCh) > 0 {
gotErr = <-errCh
if shortCircuit {
return successes, gotErr
}
}
remaining -= batchSize
}
return successes, gotErr
}
107 changes: 107 additions & 0 deletions controller/utils/slow_start_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
Copyright 2024-2025 The KusionStack Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"fmt"
"sync"
"testing"
)

const expectedErrMsg = "expected error"

func TestSlowStartBatch(t *testing.T) {
testcases := []struct {
name string
count int
errorIndex *int
shortcut bool

expectedCallCount int
expectedSuccessCount int
}{
{
name: "happy pass",
count: 10,
errorIndex: nil,
shortcut: false,
expectedCallCount: 10,
expectedSuccessCount: 10,
},
{
name: "failed without shortcut",
count: 10,
errorIndex: intPointer(5),
shortcut: false,
expectedCallCount: 10,
expectedSuccessCount: 9,
},
{
name: "failed without shortcut",
count: 10,
errorIndex: intPointer(5),
shortcut: true,
expectedCallCount: 7, // 1 count in batch 1 + 2 counts in batch 2 + 4 counts in batch 3
expectedSuccessCount: 6,
},
}

for _, testcase := range testcases {
callCounter := intPointer(0)
successCount, err := SlowStartBatch(testcase.count, 1, testcase.shortcut, buildFn(callCounter, testcase.errorIndex))
if testcase.errorIndex == nil {
if err != nil {
t.Fatalf("case %s has unexpected err: %s", testcase.name, err)
}
} else {
if err == nil {
t.Fatalf("case %s has no expected err", testcase.name)
} else if err.Error() != expectedErrMsg {
t.Fatalf("case %s has expected err with unexpected message: %s", testcase.name, err)
}
}

if successCount != testcase.expectedSuccessCount {
t.Fatalf("case %s gets unexpected success count: expected %d, got %d", testcase.name, testcase.expectedSuccessCount, successCount)
}

if *callCounter != testcase.expectedCallCount {
t.Fatalf("case %s gets unexpected call count: expected %d, got %d", testcase.name, testcase.expectedCallCount, *callCounter)
}
}
}

func buildFn(callCounter, errIdx *int) func(int, error) error {
lock := sync.Mutex{}
return func(i int, err error) error {
lock.Lock()
defer func() {
*callCounter++
lock.Unlock()
}()

if errIdx != nil && *callCounter == *errIdx {
return fmt.Errorf(expectedErrMsg)
}

return nil
}
}

func intPointer(val int) *int {
return &val
}
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
k8s.io/klog/v2 v2.100.1
k8s.io/kubectl v0.28.4
k8s.io/utils v0.0.0-20240102154912-e7106e64919e
kusionstack.io/kube-api v0.2.0
kusionstack.io/kube-api v0.6.6
sigs.k8s.io/controller-runtime v0.17.3
)

Expand All @@ -31,6 +31,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/evanphx/json-patch v5.7.0+incompatible // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/zapr v1.2.4 // indirect
Expand All @@ -50,6 +51,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
Expand Down Expand Up @@ -120,6 +122,5 @@ replace (
k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.22.2
k8s.io/system-validators => k8s.io/system-validators v1.5.0
k8s.io/utils => k8s.io/utils v0.0.0-20240102154912-e7106e64919e
kusionstack.io/kube-api => kusionstack.io/kube-api v0.2.0
sigs.k8s.io/controller-runtime => sigs.k8s.io/controller-runtime v0.10.3
)
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ github.com/daviddengcn/go-colortext v0.0.0-20160507010035-511bcaf42ccd/go.mod h1
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/docker/distribution v2.7.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8=
github.com/docker/distribution v2.8.2+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
Expand Down Expand Up @@ -338,6 +340,7 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/gomega v1.15.0 h1:WjP/FQ/sk43MRmnEcT+MlDw2TFvkrXlprrPST/IudjU=
github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
Expand Down Expand Up @@ -854,6 +857,8 @@ k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCf
k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
kusionstack.io/kube-api v0.2.0 h1:40SHCpm9RdabTUTVjhsHWoX+h7djy4jMYYTcbnJ9SQc=
kusionstack.io/kube-api v0.2.0/go.mod h1:fYwuojoLs71ox8uyvyKNsJU4CtPtptSfJ3PUSt/3Hgg=
kusionstack.io/kube-api v0.6.6 h1:gMLUQL/eectQxkosnlv1m/R2xieY2crETliWRcxBICg=
kusionstack.io/kube-api v0.6.6/go.mod h1:J0+EHiroG/88X904Y9TV9iMRcoEuD5tXMTLMBDSwM+Y=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.22/go.mod h1:LEScyzhFmoF5pso/YSeBstl57mOzx9xlU9n85RGrDQg=
sigs.k8s.io/controller-runtime v0.10.3 h1:s5Ttmw/B4AuIbwrXD3sfBkXwnPMMWrqpVj4WRt1dano=
Expand Down
2 changes: 1 addition & 1 deletion hack/boilerplate.go.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2024 KusionStack Authors.
* Copyright 2024-2025 KusionStack Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Loading