Skip to content

Commit 3611f9b

Browse files
authored
Some improvements to the operator and substrate (#206)
* update CNI and karpenter version, update local path for cluster config * Update etcd mig example config * minor fix for logging format
1 parent ede4336 commit 3611f9b

File tree

11 files changed

+122
-35
lines changed

11 files changed

+122
-35
lines changed

operator/docs/examples/etcd-with-mixed-instances.yaml

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
## Create hollow pods to pre-provision mixed instances for etcd pods
44
```bash
55
ZONES=("a" "b" "c")
6-
SIZES=("m5.large" "t3.large" "c5.2xlarge")
7-
CONTROL_PLANE=foo
6+
SIZES=("t3.medium" "t3.medium" "t2.medium")
7+
CONTROL_PLANE=foo
88
for i in {1..3}; do
99
cat <<EOF | kubectl apply -f -
1010
apiVersion: v1
@@ -34,6 +34,59 @@ apiVersion: kit.k8s.sh/v1alpha1
3434
kind: ControlPlane
3535
metadata:
3636
name: ${CONTROL_PLANE} # Desired Cluster name
37-
namespace: guest
37+
spec:
38+
kubernetesVersion: "1.21"
39+
master:
40+
apiServer:
41+
replicas: 2
42+
spec:
43+
containers:
44+
- name: apiserver
45+
args:
46+
- --max-requests-inflight=400
47+
- --max-mutating-requests-inflight=200
48+
controllerManager:
49+
spec:
50+
containers:
51+
- name: controller-manager
52+
args:
53+
- --controllers=*
54+
- --kube-api-qps=300
55+
- --kube-api-burst=400
56+
scheduler:
57+
spec:
58+
containers:
59+
- name: scheduler
60+
args:
61+
- --kube-api-qps=300
62+
- --kube-api-burst=400
63+
etcd:
64+
spec:
65+
containers:
66+
- name: etcd
67+
resources:
68+
requests:
69+
memory: 2Gi
70+
EOF
71+
```
72+
73+
# Create Dataplane nodes for the guest cluster provisioned
74+
```bash
75+
CONTROL_PLANE=foo
76+
cat <<EOF | kubectl apply -f -
77+
apiVersion: kit.k8s.sh/v1alpha1
78+
kind: DataPlane
79+
metadata:
80+
name: ${CONTROL_PLANE}-nodes
81+
spec:
82+
clusterName: ${CONTROL_PLANE} # Desired Cluster Name
83+
nodeCount: 10
84+
subnetSelector:
85+
kit.aws/substrate: ${MANAGEMENT_CLUSTER_NAME}
86+
instanceTypes:
87+
- c4.xlarge
88+
- c5.xlarge
89+
- c4.4xlarge
90+
- c5.4xlarge
3891
EOF
3992
```

operator/pkg/controllers/master/certificates.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ func kubeAPIServerCertConfig(hostname string, nn types.NamespacedName) *secrets.
7979
CommonName: "kube-apiserver",
8080
AltNames: certutil.AltNames{
8181
DNSNames: []string{hostname, "localhost", "kubernetes", "kubernetes.default",
82-
"kubernetes.default.svc", "kubernetes.default.svc.cluster.local"},
82+
"kubernetes.default.svc", "kubernetes.default.svc.cluster.local",
83+
fmt.Sprintf("%s-cp.%s.svc.cluster.local", nn.Name, nn.Namespace),
84+
},
8385
IPs: []net.IP{net.IPv4(127, 0, 0, 1), apiServerVirtualIP()},
8486
},
8587
},

operator/pkg/utils/imageprovider/imageprovider.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ var (
1919
"1.19": kubeVersion119Tag,
2020
"1.20": kubeVersion120Tag,
2121
"1.21": kubeVersion121Tag,
22+
"1.22": kubeVersion122Tag,
2223
}
2324
)
2425

@@ -31,6 +32,7 @@ const (
3132
kubeVersion119Tag = "v1.19.13-eks-1-19-9"
3233
kubeVersion120Tag = "v1.20.7-eks-1-20-6"
3334
kubeVersion121Tag = "v1.21.2-eks-1-21-4"
35+
kubeVersion122Tag = "v1.22.6-eks-1-22-5"
3436
repositoryName = "public.ecr.aws/eks-distro/"
3537
busyBoxImage = "public.ecr.aws/docker/library/busybox:stable"
3638
)

substrate/cmd/kitctl/root.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ import (
1818
"context"
1919
"fmt"
2020
"io"
21+
"math/rand"
2122
"os"
23+
"time"
2224

2325
"github.com/spf13/cobra"
2426
"github.com/spf13/pflag"
@@ -36,6 +38,7 @@ func main() {
3638
}
3739
ctx, cancel := context.WithCancel(context.Background())
3840
defer cancel()
41+
rand.Seed(time.Now().UnixNano())
3942
logger := zap.New(zapcore.NewCore(zapcore.NewConsoleEncoder(zapcore.EncoderConfig{MessageKey: "message"}),
4043
customLogWriteTo(ctx, os.Stdout), zap.LevelEnablerFunc(func(level zapcore.Level) bool {
4144
return level >= logLevel

substrate/pkg/controller/substrate/cluster/addons/awsvpccni.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func (a *AWSVPCCNI) Create(ctx context.Context, substrate *v1alpha1.Substrate) (
3333
Namespace: "kube-system",
3434
Name: "aws-vpc-cni",
3535
Repository: "https://aws.github.io/eks-charts",
36-
Version: "1.1.13",
36+
Version: "1.1.16",
3737
}); err != nil {
3838
return reconcile.Result{}, fmt.Errorf("applying chart, %w", err)
3939
}

substrate/pkg/controller/substrate/cluster/addons/karpenter.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (k *Karpenter) Create(ctx context.Context, substrate *v1alpha1.Substrate) (
6666
Namespace: "karpenter",
6767
Name: "karpenter",
6868
Repository: "https://charts.karpenter.sh",
69-
Version: "0.7.3",
69+
Version: "0.9.0",
7070
CreateNamespace: true,
7171
Values: map[string]interface{}{
7272
"clusterName": substrate.Name,
@@ -90,13 +90,11 @@ func (k *Karpenter) Create(ctx context.Context, substrate *v1alpha1.Substrate) (
9090
if err != nil {
9191
return reconcile.Result{}, fmt.Errorf("initializing client, %w", err)
9292
}
93+
subnets := append(substrate.Status.Infrastructure.PublicSubnetIDs, substrate.Status.Infrastructure.PrivateSubnetIDs...)
9394
// Tag EC2 Resources
9495
if _, err := k.EC2.CreateTagsWithContext(ctx, &ec2.CreateTagsInput{
95-
Resources: aws.StringSlice(append(
96-
substrate.Status.Infrastructure.PublicSubnetIDs,
97-
aws.StringValue(substrate.Status.Infrastructure.SecurityGroupID),
98-
)),
99-
Tags: []*ec2.Tag{{Key: aws.String("karpenter.sh/discovery"), Value: aws.String(substrate.Name)}},
96+
Resources: aws.StringSlice(append(subnets, aws.StringValue(substrate.Status.Infrastructure.SecurityGroupID))),
97+
Tags: []*ec2.Tag{{Key: aws.String("karpenter.sh/discovery"), Value: aws.String(substrate.Name)}},
10098
}); err != nil {
10199
return reconcile.Result{}, fmt.Errorf("tagging resources, %w", err)
102100
}

substrate/pkg/controller/substrate/cluster/addons/tekton.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ rules:
3737
resources: ["controlplanes", "dataplanes"]
3838
verbs: ["get", "list", "watch", "create", "update", "delete", "patch"]
3939
- apiGroups: [""]
40-
resources: ["serviceaccounts", "secrets", "namespaces", "nodes"]
40+
resources: ["serviceaccounts", "secrets", "namespaces", "nodes", "persistentvolumeclaims"]
4141
verbs: ["get", "list", "watch", "create", "update", "delete", "patch"]
4242
- apiGroups: ["apiextensions.k8s.io"]
4343
resources: ["customresourcedefinitions"]

substrate/pkg/controller/substrate/cluster/config.go

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ import (
4848
)
4949

5050
const (
51-
ClusterCertsBasePath = "/tmp/"
5251
kubeconfigPath = "/etc/kubernetes"
5352
kubeconfigFile = "etc/kubernetes/admin.conf"
5453
certPKIPath = "/etc/kubernetes/pki"
@@ -66,15 +65,21 @@ const (
6665
)
6766

6867
type Config struct {
69-
S3 *s3.S3
70-
STS *sts.STS
71-
S3Uploader *s3manager.Uploader
68+
S3 *s3.S3
69+
STS *sts.STS
70+
S3Uploader *s3manager.Uploader
71+
clusterConfigPath string
7272
}
7373

7474
func (c *Config) Create(ctx context.Context, substrate *v1alpha1.Substrate) (reconcile.Result, error) {
7575
if substrate.Status.Cluster.APIServerAddress == nil {
7676
return reconcile.Result{Requeue: true}, nil
7777
}
78+
if c.clusterConfigPath == "" {
79+
if err := c.ensureKitEnvDir(); err != nil {
80+
return reconcile.Result{}, fmt.Errorf("ensuring kit env dir, %w", err)
81+
}
82+
}
7883
// ensure S3 bucket
7984
if err := c.ensureBucket(ctx, substrate); err != nil {
8085
return reconcile.Result{}, fmt.Errorf("ensuring S3 bucket, %w", err)
@@ -102,11 +107,11 @@ func (c *Config) Create(ctx context.Context, substrate *v1alpha1.Substrate) (rec
102107
}
103108
// upload to s3 bucket
104109
if err := c.S3Uploader.UploadWithIterator(ctx, NewDirectoryIterator(
105-
aws.StringValue(discovery.Name(substrate)), path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate))))); err != nil {
110+
aws.StringValue(discovery.Name(substrate)), path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate))))); err != nil {
106111
return reconcile.Result{}, fmt.Errorf("uploading to S3 %w", err)
107112
}
108113
logging.FromContext(ctx).Debugf("Uploaded cluster configuration to s3://%s", aws.StringValue(discovery.Name(substrate)))
109-
substrate.Status.Cluster.KubeConfig = ptr.String(path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate)), kubeconfigFile))
114+
substrate.Status.Cluster.KubeConfig = ptr.String(path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate)), kubeconfigFile))
110115
return reconcile.Result{}, nil
111116
}
112117

@@ -124,7 +129,7 @@ func (c *Config) Delete(ctx context.Context, substrate *v1alpha1.Substrate) (rec
124129
} else {
125130
logging.FromContext(ctx).Infof("Deleted S3 bucket %s", aws.StringValue(discovery.Name(substrate)))
126131
}
127-
return reconcile.Result{}, os.RemoveAll(path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate))))
132+
return reconcile.Result{}, os.RemoveAll(path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate))))
128133
}
129134

130135
func ErrNoSuchBucket(err error) bool {
@@ -137,7 +142,7 @@ func ErrNoSuchBucket(err error) bool {
137142
}
138143

139144
func (c *Config) generateCerts(cfg *kubeadm.InitConfiguration, substrate *v1alpha1.Substrate) error {
140-
cfg.CertificatesDir = path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate)), certPKIPath)
145+
cfg.CertificatesDir = path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate)), certPKIPath)
141146
certTree, err := certs.GetDefaultCertList().AsMap().CertTree()
142147
if err != nil {
143148
return err
@@ -151,7 +156,7 @@ func (c *Config) generateCerts(cfg *kubeadm.InitConfiguration, substrate *v1alph
151156

152157
func (c *Config) kubeConfigs(cfg *kubeadm.InitConfiguration, substrate *v1alpha1.Substrate) error {
153158
// Generate Kube config files for master components
154-
kubeConfigDir := path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate)), kubeconfigPath)
159+
kubeConfigDir := path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate)), kubeconfigPath)
155160
for _, kubeConfigFileName := range []string{
156161
kubeadmconstants.AdminKubeConfigFileName,
157162
kubeadmconstants.KubeletKubeConfigFileName,
@@ -165,7 +170,7 @@ func (c *Config) kubeConfigs(cfg *kubeadm.InitConfiguration, substrate *v1alpha1
165170
}
166171

167172
func (c *Config) generateStaticPodManifests(cfg *kubeadm.InitConfiguration, substrate *v1alpha1.Substrate) error {
168-
manifestDir := path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate)), clusterManifestPath)
173+
manifestDir := path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate)), clusterManifestPath)
169174
// etcd phase adds cfg.CertificatesDir to static pod yaml for pods to read the certs from
170175
cfg.CertificatesDir = certPKIPath
171176
if err := etcd.CreateLocalEtcdStaticPodManifestFile(
@@ -176,7 +181,7 @@ func (c *Config) generateStaticPodManifests(cfg *kubeadm.InitConfiguration, subs
176181
kubeadmconstants.KubeAPIServer,
177182
kubeadmconstants.KubeControllerManager,
178183
kubeadmconstants.KubeScheduler} {
179-
err := controlplane.CreateStaticPodFiles(path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate)), clusterManifestPath), "",
184+
err := controlplane.CreateStaticPodFiles(path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate)), clusterManifestPath), "",
180185
&cfg.ClusterConfiguration, &cfg.LocalAPIEndpoint, false, componentName)
181186
if err != nil {
182187
return fmt.Errorf("creating static pod file for %v, %w", componentName, err)
@@ -200,7 +205,7 @@ func (c *Config) ensureBucket(ctx context.Context, substrate *v1alpha1.Substrate
200205
}
201206

202207
func (c *Config) kubeletSystemService(cfg *kubeadm.InitConfiguration, substrate *v1alpha1.Substrate) error {
203-
localDir := path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate)), kubeletSystemdPath)
208+
localDir := path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate)), kubeletSystemdPath)
204209
if _, err := os.Stat(localDir); err != nil {
205210
if !os.IsNotExist(err) {
206211
return err
@@ -295,7 +300,7 @@ func (c *Config) ensureAuthenticatorConfig(ctx context.Context, substrate *v1alp
295300
return fmt.Errorf("creating authenticator config, %w", err)
296301
}
297302
logging.FromContext(ctx).Debugf("Created config map for authenticator")
298-
configDir := path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate)), authenticatorConfigDir)
303+
configDir := path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate)), authenticatorConfigDir)
299304
if err := os.MkdirAll(configDir, 0700); err != nil {
300305
return fmt.Errorf("failed to create directory, %w", err)
301306
}
@@ -318,17 +323,30 @@ func (c *Config) staticPodSpecForAuthenticator(ctx context.Context, substrate *v
318323
if err != nil {
319324
return fmt.Errorf("failed to marshal config map manifest, %w", err)
320325
}
321-
if err := ioutil.WriteFile(path.Join(ClusterCertsBasePath, aws.StringValue(discovery.Name(substrate)),
326+
if err := ioutil.WriteFile(path.Join(c.clusterConfigPath, aws.StringValue(discovery.Name(substrate)),
322327
clusterManifestPath, "aws-iam-authenticator.yaml"), serialized, 0644); err != nil {
323328
return fmt.Errorf("writing authenticator pod yaml, %w", err)
324329
}
325330
return nil
326331
}
327332

333+
func (c *Config) ensureKitEnvDir() error {
334+
home, err := os.UserHomeDir()
335+
if err != nil {
336+
return fmt.Errorf("finding HOME dir %v", err)
337+
}
338+
c.clusterConfigPath = filepath.Join(home, ".kit/env")
339+
if err := os.MkdirAll(c.clusterConfigPath, 0755); err != nil {
340+
return fmt.Errorf("creating .kit/env dir %v", err)
341+
}
342+
return nil
343+
}
344+
328345
// DirectoryIterator represents an iterator of a specified directory
329346
type DirectoryIterator struct {
330347
filePaths []string
331348
bucket string
349+
localDir string
332350
next struct {
333351
path string
334352
f *os.File
@@ -351,6 +369,7 @@ func NewDirectoryIterator(bucket, dir string) s3manager.BatchUploadIterator {
351369
return &DirectoryIterator{
352370
filePaths: paths,
353371
bucket: bucket,
372+
localDir: dir,
354373
}
355374
}
356375

@@ -373,6 +392,8 @@ func (d *DirectoryIterator) Err() error {
373392

374393
// UploadObject uploads a file
375394
func (d *DirectoryIterator) UploadObject() s3manager.BatchUploadObject {
395+
// trim the local path before uploading to S3
396+
d.next.path = strings.TrimPrefix(d.next.path, d.localDir)
376397
return s3manager.BatchUploadObject{
377398
Object: &s3manager.UploadInput{Bucket: &d.bucket, Key: &d.next.path, Body: d.next.f},
378399
After: d.next.f.Close,

substrate/pkg/controller/substrate/cluster/launchtemplate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ while [ true ]; do
9898
echo "\$(date) Syncing S3 files for \$dir"
9999
mkdir -p \$dir
100100
existing_checksum=\$(ls -alR \$dir | md5sum)
101-
aws s3 sync s3://%[2]s/tmp/%[2]s\$dir "\$dir"
101+
aws s3 sync s3://%[2]s\$dir "\$dir"
102102
new_checksum=\$(ls -alR \$dir | md5sum)
103103
if [ "\$new_checksum" != "\$existing_checksum" ]; then
104104
echo "Successfully synced from S3 \$dir"

substrate/pkg/controller/substrate/controller.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ package substrate
1717
import (
1818
"context"
1919
"fmt"
20+
"math/rand"
2021
"reflect"
2122
"sync"
2223
"time"
2324

2425
"github.com/aws/aws-sdk-go/aws"
26+
"github.com/aws/aws-sdk-go/aws/awserr"
2527
"github.com/aws/aws-sdk-go/aws/endpoints"
2628
"github.com/aws/aws-sdk-go/aws/request"
2729
"github.com/aws/aws-sdk-go/aws/session"
@@ -39,6 +41,7 @@ import (
3941
"go.uber.org/multierr"
4042
"k8s.io/apimachinery/pkg/util/runtime"
4143
"k8s.io/client-go/util/workqueue"
44+
"knative.dev/pkg/logging"
4245
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4346
)
4447

@@ -101,17 +104,22 @@ func (c *Controller) Reconcile(ctx context.Context, substrate *v1alpha1.Substrat
101104
}
102105
result, err := f(ctx, mutable)
103106
if err != nil {
104-
errs[i] = fmt.Errorf("reconciling %s, %w", reflect.ValueOf(resource).Elem().Type(), err)
105-
cancel()
106-
return
107+
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == "RequestLimitExceeded" {
108+
logging.FromContext(ctx).Debugf("RequestLimitExceeded while reconciling %s, err %w", reflect.ValueOf(resource).Elem().Type(), err)
109+
} else {
110+
logging.FromContext(ctx).Errorf("reconciling %s, err %v", reflect.ValueOf(resource).Elem().Type(), err)
111+
errs[i] = fmt.Errorf("reconciling %s, %w", reflect.ValueOf(resource).Elem().Type(), err)
112+
cancel()
113+
return
114+
}
107115
}
108116
c.Lock()
109117
runtime.Must(mergo.Merge(substrate, mutable))
110118
c.Unlock()
111119
if !result.Requeue && result.RequeueAfter == 0 {
112120
return
113121
}
114-
time.Sleep(result.RequeueAfter + time.Second*1)
122+
time.Sleep(result.RequeueAfter + time.Duration(rand.Intn(3000))*time.Millisecond)
115123
}
116124
})
117125
return multierr.Combine(errs...)

0 commit comments

Comments
 (0)