diff --git a/apiexport/provider.go b/apiexport/provider.go index 5b6f683..c2d0190 100644 --- a/apiexport/provider.go +++ b/apiexport/provider.go @@ -41,6 +41,8 @@ import ( kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" "github.com/kcp-dev/logicalcluster/v3" + + mcpcache "github.com/kcp-dev/multicluster-provider/internal/cache" ) var _ multicluster.Provider = &Provider{} @@ -52,7 +54,7 @@ var _ multicluster.Provider = &Provider{} type Provider struct { config *rest.Config scheme *runtime.Scheme - cache WildcardCache + cache mcpcache.WildcardCache object client.Object log logr.Logger @@ -70,7 +72,7 @@ type Options struct { // WildcardCache is the wildcard cache to use for the provider. If this is // nil, a new wildcard cache will be created for the given rest.Config. - WildcardCache WildcardCache + WildcardCache mcpcache.WildcardCache // ObjectToWatch is the object type that the provider watches via a /clusters/* // wildcard endpoint to extract information about logical clusters joining and @@ -92,7 +94,7 @@ func New(cfg *rest.Config, options Options) (*Provider, error) { } if options.WildcardCache == nil { var err error - options.WildcardCache, err = NewWildcardCache(cfg, cache.Options{ + options.WildcardCache, err = mcpcache.NewWildcardCache(cfg, cache.Options{ Scheme: options.Scheme, }) if err != nil { @@ -125,7 +127,7 @@ func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error { if err != nil { return fmt.Errorf("failed to get %T informer: %w", p.object, err) } - shInf, _, _, err := p.cache.getSharedInformer(p.object) + shInf, _, _, err := p.cache.GetSharedInformer(p.object) if err != nil { return fmt.Errorf("failed to get shared informer: %w", err) } @@ -155,7 +157,7 @@ func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error { // create new scoped cluster. clusterCtx, cancel := context.WithCancel(ctx) - cl, err := newScopedCluster(p.config, clusterName, p.cache, p.scheme) + cl, err := mcpcache.NewScopedCluster(p.config, clusterName, p.cache, p.scheme) if err != nil { p.log.Error(err, "failed to create cluster", "cluster", clusterName) cancel() diff --git a/envtest/workspaces.go b/envtest/workspaces.go index 2169496..37acfd7 100644 --- a/envtest/workspaces.go +++ b/envtest/workspaces.go @@ -170,6 +170,69 @@ func NewWorkspaceFixture(t TestingT, clusterClient kcpclient.ClusterClient, pare return ws, parent.Join(ws.Name) } +// NewInitializingWorkspaceFixture creates a new workspace under the given parent +// using the given client, and waits for it to be stuck in the initializing phase. +func NewInitializingWorkspaceFixture(t TestingT, clusterClient kcpclient.ClusterClient, parent logicalcluster.Path, options ...WorkspaceOption) (*tenancyv1alpha1.Workspace, logicalcluster.Path) { + t.Helper() + + ctx, cancelFunc := context.WithCancel(context.Background()) + t.Cleanup(cancelFunc) + + ws := &tenancyv1alpha1.Workspace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "e2e-workspace-", + }, + Spec: tenancyv1alpha1.WorkspaceSpec{ + Type: tenancyv1alpha1.WorkspaceTypeReference{ + Name: tenancyv1alpha1.WorkspaceTypeName("universal"), + Path: "root", + }, + }, + } + for _, opt := range options { + opt(ws) + } + + // we are referring here to a WorkspaceType that may have just been created; if the admission controller + // does not have a fresh enough cache, our request will be denied as the admission controller does not know the + // type exists. Therefore, we can require.Eventually our way out of this problem. We expect users to create new + // types very infrequently, so we do not think this will be a serious UX issue in the product. + Eventually(t, func() (bool, string) { + err := clusterClient.Cluster(parent).Create(ctx, ws) + return err == nil, fmt.Sprintf("error creating workspace under %s: %v", parent, err) + }, wait.ForeverTestTimeout, time.Millisecond*100, "failed to create %s workspace under %s", ws.Spec.Type.Name, parent) + + wsName := ws.Name + t.Cleanup(func() { + if os.Getenv("PRESERVE") != "" { + return + } + + ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) + defer cancelFn() + + err := clusterClient.Cluster(parent).Delete(ctx, ws) + if apierrors.IsNotFound(err) || apierrors.IsForbidden(err) { + return // ignore not found and forbidden because this probably means the parent has been deleted + } + require.NoErrorf(t, err, "failed to delete workspace %s", wsName) + }) + + Eventually(t, func() (bool, string) { + err := clusterClient.Cluster(parent).Get(ctx, client.ObjectKey{Name: ws.Name}, ws) + require.Falsef(t, apierrors.IsNotFound(err), "workspace %s was deleted", parent.Join(ws.Name)) + require.NoError(t, err, "failed to get workspace %s", parent.Join(ws.Name)) + if actual, expected := ws.Status.Phase, corev1alpha1.LogicalClusterPhaseInitializing; actual != expected { + return false, fmt.Sprintf("workspace phase is %s, not %s\n\n%s", actual, expected, toYaml(t, ws)) + } + return true, "" + }, workspaceInitTimeout, time.Millisecond*100, "%s workspace %s is not stuck on initializing", ws.Spec.Type, parent.Join(ws.Name)) + + t.Logf("Created %s workspace %s as /clusters/%s on shard %q", ws.Spec.Type, parent.Join(ws.Name), ws.Spec.Cluster, WorkspaceShardOrDie(t, clusterClient, ws).Name) + + return ws, parent.Join(ws.Name) +} + // WorkspaceShard returns the shard that a workspace is scheduled on. func WorkspaceShard(ctx context.Context, kcpClient kcpclient.ClusterClient, ws *tenancyv1alpha1.Workspace) (*corev1alpha1.Shard, error) { shards := &corev1alpha1.ShardList{} diff --git a/examples/apiexport/main.go b/examples/apiexport/main.go index e5f746a..3c90e97 100644 --- a/examples/apiexport/main.go +++ b/examples/apiexport/main.go @@ -19,7 +19,6 @@ package main import ( "context" "fmt" - "net/http" "os" "github.com/spf13/pflag" @@ -139,9 +138,3 @@ func main() { os.Exit(1) } } - -type RoundTripperFunc func(*http.Request) (*http.Response, error) - -func (f RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { - return f(r) -} diff --git a/examples/initializingworkspaces/README.md b/examples/initializingworkspaces/README.md new file mode 100644 index 0000000..10759ff --- /dev/null +++ b/examples/initializingworkspaces/README.md @@ -0,0 +1,41 @@ +# `initializingworkspaces` Example Controller + +This folder contains an example controller for the `initializingworkspaces` provider implementation. It reconciles `ConfigMap` objects across kcp workspaces. + +It can be tested by applying the necessary manifests from the respective folder while connected to the `root` workspace of a kcp instance: + +```sh +$ kubectl apply -f ./manifests/bundle.yaml +workspacetype.tenancy.kcp.io/examples-initializingworkspaces-multicluster created +workspace.tenancy.kcp.io/example1 created +workspace.tenancy.kcp.io/example2 created +workspace.tenancy.kcp.io/example3 created +``` + +Then, start the example controller by passing the virtual workspace URL to it: + +```sh +$ go run . --server=$(kubectl get workspacetype examples-initializingworkspaces-multicluster -o jsonpath="{.status.virtualWorkspaces[0].url}") --initializer=root:examples-initializingworkspaces-multicluster +``` + +Observe the controller reconciling every logical cluster and creating the child workspace `initialized-workspace` and the `kcp-initializer-cm` ConfigMap in each workspace and removing the initializer when done. + +```sh +2025-07-24T10:26:58+02:00 INFO Starting to initialize cluster {"controller": "kcp-initializer-controller", "controllerGroup": "core.kcp.io", "controllerKind": "LogicalCluster", "reconcileID": "0c69fc44-4886-42f8-b620-82a6fe96a165", "cluster": "1i6ttu8js47cs302"} +2025-07-24T10:26:58+02:00 INFO Creating child workspace {"controller": "kcp-initializer-controller", "controllerGroup": "core.kcp.io", "controllerKind": "LogicalCluster", "reconcileID": "0c69fc44-4886-42f8-b620-82a6fe96a165", "cluster": "1i6ttu8js47cs302", "name": "initialized-workspace-1i6ttu8js47cs302"} +2025-07-24T10:26:58+02:00 INFO kcp-initializing-workspaces-provider disengaging non-initializing workspace {"cluster": "1cxhyp0xy8lartoi"} +2025-07-24T10:26:58+02:00 INFO Workspace created successfully {"controller": "kcp-initializer-controller", "controllerGroup": "core.kcp.io", "controllerKind": "LogicalCluster", "reconcileID": "0c69fc44-4886-42f8-b620-82a6fe96a165", "cluster": "1i6ttu8js47cs302", "name": "initialized-workspace-1i6ttu8js47cs302"} +2025-07-24T10:26:58+02:00 INFO Reconciling ConfigMap {"controller": "kcp-initializer-controller", "controllerGroup": "core.kcp.io", "controllerKind": "LogicalCluster", "reconcileID": "0c69fc44-4886-42f8-b620-82a6fe96a165", "cluster": "1i6ttu8js47cs302", "name": "kcp-initializer-cm", "uuid": ""} +2025-07-24T10:26:58+02:00 INFO ConfigMap created successfully {"controller": "kcp-initializer-controller", "controllerGroup": "core.kcp.io", "controllerKind": "LogicalCluster", "reconcileID": "0c69fc44-4886-42f8-b620-82a6fe96a165", "cluster": "1i6ttu8js47cs302", "name": "kcp-initializer-cm", "uuid": "9a8a8d5d-d606-4e08-bb69-679719d94867"} +2025-07-24T10:26:58+02:00 INFO Removed initializer from LogicalCluster status {"controller": "kcp-initializer-controller", "controllerGroup": "core.kcp.io", "controllerKind": "LogicalCluster", "reconcileID": "0c69fc44-4886-42f8-b620-82a6fe96a165", "cluster": "1i6ttu8js47cs302", "name": "cluster", "uuid": "4c2fd3cf-512f-45f4-a9d3-6886c6542ccf"} +2025-07-24T10:26:58+02:00 INFO Reconciling LogicalCluster {"controller": "kcp-initializer-controller", "controllerGroup": "core.kcp.io", "controllerKind": "LogicalCluster", "reconcileID": "99bd39f0-3ea8-4805-9770-60f95127b5ac", "cluster": "2hwz9858cyir31hl", "logical cluster": {"owner":{"apiVersion":"tenancy.kcp.io/v1alpha1","resource":"workspaces","name":"example1","cluster":"root","uid":"1d79b26f-cfb8-40d5-934d-b4a61eb20f12"},"initializers":["root:universal","root:examples-initializingworkspaces-multicluster","system:apibindings"]}} +2025-07-24T10:26:58+02:00 INFO Starting to initialize cluster {"controller": "kcp-initializer-controller", "controllerGroup": "core.kcp.io", "controllerKind": "LogicalCluster", "reconcileID": "99bd39f0-3ea8-4805-9770-60f95127b5ac", "cluster": "2hwz9858cyir31hl"} +2025-07-24T10:26:58+02:00 INFO Creating child workspace {"controller": "kcp-initializer-controller", "controllerGroup": "core.kcp.io", "controllerKind": "LogicalCluster", "reconcileID": "99bd39f0-3ea8-4805-9770-60f95127b5ac", "cluster": "2hwz9858cyir31hl", "name": "initialized-workspace-2hwz9858cyir31hl"} +2025-07-24T10:26:58+02:00 INFO kcp-initializing-workspaces-provider disengaging non-initializing workspace {"cluster": "1i6ttu8js47cs302"} +2025-07-24T10:26:58+02:00 INFO Workspace created successfully {"controller": "kcp-initializer-controller", "controllerGroup": "core.kcp.io", "controllerKind": "LogicalCluster", "reconcileID": "99bd39f0-3ea8-4805-9770-60f95127b5ac", "cluster": "2hwz9858cyir31hl", "name": "initialized-workspace-2hwz9858cyir31hl"} +2025-07-24T10:26:58+02:00 INFO Reconciling ConfigMap {"controller": "kcp-initializer-controller", "controllerGroup": "core.kcp.io", "controllerKind": "LogicalCluster", "reconcileID": "99bd39f0-3ea8-4805-9770-60f95127b5ac", "cluster": "2hwz9858cyir31hl", "name": "kcp-initializer-cm", "uuid": ""} +2025-07-24T10:26:58+02:00 INFO ConfigMap created successfully {"controller": "kcp-initializer-controller", "controllerGroup": "core.kcp.io", "controllerKind": "LogicalCluster", "reconcileID": "99bd39f0-3ea8-4805-9770-60f95127b5ac", "cluster": "2hwz9858cyir31hl", "name": "kcp-initializer-cm", "uuid": "87462d41-16b5-4617-9f7c-3894160576b7"} +2025-07-24T10:26:58+02:00 INFO Removed initializer from LogicalCluster status {"controller": "kcp-initializer-controller", "controllerGroup": "core.kcp.io", "controllerKind": "LogicalCluster", "reconcileID": "99bd39f0-3ea8-4805-9770-60f95127b5ac", "cluster": "2hwz9858cyir31hl", "name": "cluster", "uuid": "cfeee05f-3cba-4766-b464-ba3ebe41a3fa"} +2025-07-24T10:26:58+02:00 INFO Reconciling LogicalCluster {"controller": "kcp-initializer-controller", "controllerGroup": "core.kcp.io", "controllerKind": "LogicalCluster", "reconcileID": "8ad43574-3862-4452-b1e0-e9daf1e67a54", "cluster": "2hwz9858cyir31hl", "logical cluster": {"owner":{"apiVersion":"tenancy.kcp.io/v1alpha1","resource":"workspaces","name":"example1","cluster":"root","uid":"1d79b26f-cfb8-40d5-934d-b4a61eb20f12"},"initializers":["root:universal","root:examples-initializingworkspaces-multicluster","system:apibindings"]}} +2025-07-24T10:26:59+02:00 INFO kcp-initializing-workspaces-provider disengaging non-initializing workspace {"cluster": "2hwz9858cyir31hl"} +``` diff --git a/examples/initializingworkspaces/main.go b/examples/initializingworkspaces/main.go new file mode 100644 index 0000000..a5ef86a --- /dev/null +++ b/examples/initializingworkspaces/main.go @@ -0,0 +1,203 @@ +/* +Copyright 2025 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "os" + "slices" + + "github.com/spf13/pflag" + "go.uber.org/zap/zapcore" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + mcbuilder "sigs.k8s.io/multicluster-runtime/pkg/builder" + mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" + mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" + + apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" + corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" + "github.com/kcp-dev/kcp/sdk/apis/tenancy/initialization" + tenancyv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/tenancy/v1alpha1" + + "github.com/kcp-dev/multicluster-provider/initializingworkspaces" +) + +func init() { + runtime.Must(corev1alpha1.AddToScheme(scheme.Scheme)) + runtime.Must(tenancyv1alpha1.AddToScheme(scheme.Scheme)) + runtime.Must(apisv1alpha1.AddToScheme(scheme.Scheme)) +} + +func main() { + var ( + server string + initializerName string + provider *initializingworkspaces.Provider + verbosity int + ) + + pflag.StringVar(&server, "server", "", "Override for kubeconfig server URL") + pflag.StringVar(&initializerName, "initializer", "initializer:example", "Name of the initializer to use") + pflag.IntVar(&verbosity, "v", 1, "Log verbosity level") + pflag.Parse() + + logOpts := zap.Options{ + Development: true, + Level: zapcore.Level(-verbosity), + } + log.SetLogger(zap.New(zap.UseFlagOptions(&logOpts))) + + ctx := signals.SetupSignalHandler() + entryLog := log.Log.WithName("entrypoint") + cfg := ctrl.GetConfigOrDie() + cfg = rest.CopyConfig(cfg) + + if server != "" { + cfg.Host = server + } + + entryLog.Info("Setting up manager") + opts := manager.Options{} + + var err error + provider, err = initializingworkspaces.New(cfg, initializingworkspaces.Options{InitializerName: initializerName}) + if err != nil { + entryLog.Error(err, "unable to construct cluster provider") + os.Exit(1) + } + + mgr, err := mcmanager.New(cfg, provider, opts) + if err != nil { + entryLog.Error(err, "unable to set up overall controller manager") + os.Exit(1) + } + + if err := mcbuilder.ControllerManagedBy(mgr). + Named("kcp-initializer-controller"). + For(&corev1alpha1.LogicalCluster{}). + Complete(mcreconcile.Func( + func(ctx context.Context, req mcreconcile.Request) (ctrl.Result, error) { + log := log.FromContext(ctx).WithValues("cluster", req.ClusterName) + cl, err := mgr.GetCluster(ctx, req.ClusterName) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to get cluster: %w", err) + } + client := cl.GetClient() + lc := &corev1alpha1.LogicalCluster{} + if err := client.Get(ctx, req.NamespacedName, lc); err != nil { + return reconcile.Result{}, fmt.Errorf("failed to get logical cluster: %w", err) + } + + log.Info("Reconciling LogicalCluster", "logical cluster", lc.Spec) + initializer := corev1alpha1.LogicalClusterInitializer(initializerName) + // check if your initializer is still set on the logicalcluster + if slices.Contains(lc.Status.Initializers, initializer) { + log.Info("Starting to initialize cluster") + workspaceName := fmt.Sprintf("initialized-workspace-%s", req.ClusterName) + ws := &tenancyv1alpha1.Workspace{} + err = client.Get(ctx, ctrlclient.ObjectKey{Name: workspaceName}, ws) + if err != nil { + if !apierrors.IsNotFound(err) { + log.Error(err, "Error checking for existing workspace") + return reconcile.Result{}, nil + } + + log.Info("Creating child workspace", "name", workspaceName) + ws = &tenancyv1alpha1.Workspace{ + ObjectMeta: ctrl.ObjectMeta{ + Name: workspaceName, + }, + } + + if err := client.Create(ctx, ws); err != nil { + log.Error(err, "Failed to create workspace") + return reconcile.Result{}, nil + } + log.Info("Workspace created successfully", "name", workspaceName) + } + + if ws.Status.Phase != corev1alpha1.LogicalClusterPhaseReady { + log.Info("Workspace not ready yet", "current-phase", ws.Status.Phase) + return reconcile.Result{Requeue: true}, nil + } + log.Info("Workspace is ready, proceeding to create ConfigMap") + + s := &corev1.ConfigMap{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "kcp-initializer-cm", + Namespace: "default", + }, + Data: map[string]string{ + "test-data": "example-value", + }, + } + log.Info("Reconciling ConfigMap", "name", s.Name, "uuid", s.UID) + if err := client.Create(ctx, s); err != nil { + return reconcile.Result{}, fmt.Errorf("failed to create configmap: %w", err) + } + log.Info("ConfigMap created successfully", "name", s.Name, "uuid", s.UID) + } + // Remove the initializer from the logical cluster status + // so that it won't be processed again. + if !slices.Contains(lc.Status.Initializers, initializer) { + log.Info("Initializer already absent, skipping patch") + return reconcile.Result{}, nil + } + + patch := ctrlclient.MergeFrom(lc.DeepCopy()) + lc.Status.Initializers = initialization.EnsureInitializerAbsent(initializer, lc.Status.Initializers) + if err := client.Status().Patch(ctx, lc, patch); err != nil { + return reconcile.Result{}, err + } + log.Info("Removed initializer from LogicalCluster status", "name", lc.Name, "uuid", lc.UID) + return reconcile.Result{}, nil + }, + )); err != nil { + entryLog.Error(err, "failed to build controller") + os.Exit(1) + } + + if provider != nil { + entryLog.Info("Starting provider") + go func() { + if err := provider.Run(ctx, mgr); err != nil { + entryLog.Error(err, "unable to run provider") + os.Exit(1) + } + }() + } + + entryLog.Info("Starting manager") + if err := mgr.Start(ctx); err != nil { + entryLog.Error(err, "unable to run manager") + os.Exit(1) + } +} diff --git a/examples/initializingworkspaces/manifests/bundle.yaml b/examples/initializingworkspaces/manifests/bundle.yaml new file mode 100644 index 0000000..48407af --- /dev/null +++ b/examples/initializingworkspaces/manifests/bundle.yaml @@ -0,0 +1,54 @@ +# Copyright 2025 The KCP Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tenancy.kcp.io/v1alpha1 +kind: WorkspaceType +metadata: + name: examples-initializingworkspaces-multicluster +spec: + initializer: true + defaultChildWorkspaceType: + name: universal + path: root + extend: + with: + - name: universal + path: root +--- +apiVersion: tenancy.kcp.io/v1alpha1 +kind: Workspace +metadata: + name: example1 +spec: + type: + name: examples-initializingworkspaces-multicluster + path: root +--- +apiVersion: tenancy.kcp.io/v1alpha1 +kind: Workspace +metadata: + name: example2 +spec: + type: + name: examples-initializingworkspaces-multicluster + path: root +--- +apiVersion: tenancy.kcp.io/v1alpha1 +kind: Workspace +metadata: + name: example3 +spec: + type: + name: examples-initializingworkspaces-multicluster + path: root diff --git a/hack/verify-boilerplate.sh b/hack/verify-boilerplate.sh index ef1c3fd..b30aabc 100755 --- a/hack/verify-boilerplate.sh +++ b/hack/verify-boilerplate.sh @@ -25,7 +25,7 @@ echo "Checking file boilerplates…" _tools/boilerplate \ -boilerplates hack/boilerplate \ -exclude .github \ - -exclude apiexport/forked_cache_reader.go \ + -exclude internal/cache/forked_cache_reader.go \ -exclude envtest _tools/boilerplate -boilerplates hack/boilerplate/kubernetes \ -exclude envtest/doc.go \ @@ -33,7 +33,7 @@ _tools/boilerplate -boilerplates hack/boilerplate/kubernetes \ -exclude envtest/scheme.go \ -exclude envtest/testing.go \ -exclude envtest/workspaces.go \ - envtest apiexport/forked_cache_reader.go + internal/cache/forked_cache_reader.go _tools/boilerplate \ -boilerplates hack/boilerplate \ envtest/doc.go \ diff --git a/initializingworkspaces/doc.go b/initializingworkspaces/doc.go new file mode 100644 index 0000000..5f1028a --- /dev/null +++ b/initializingworkspaces/doc.go @@ -0,0 +1,24 @@ +/* +Copyright 2025 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Package initializingworkspaces provides a [sigs.k8s.io/multicluster-runtime] provider implementation for interacting with +initializing virtual workspace exposed by a [kcp] instance. This provider can be used for writing controllers +that reconcile Workspaces in the Initializing phase. + +[kcp]: https://kcp.io +*/ +package initializingworkspaces diff --git a/initializingworkspaces/provider.go b/initializingworkspaces/provider.go new file mode 100644 index 0000000..ed19147 --- /dev/null +++ b/initializingworkspaces/provider.go @@ -0,0 +1,275 @@ +/* +Copyright 2025 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package initializingworkspaces + +import ( + "context" + "fmt" + "slices" + "strings" + "sync" + "time" + + "github.com/go-logr/logr" + "golang.org/x/sync/errgroup" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + toolscache "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/log" + + mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" + "sigs.k8s.io/multicluster-runtime/pkg/multicluster" + + kcpcorev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" + "github.com/kcp-dev/logicalcluster/v3" + + mcpcache "github.com/kcp-dev/multicluster-provider/internal/cache" +) + +var _ multicluster.Provider = &Provider{} + +// Provider is a [sigs.k8s.io/multicluster-runtime/pkg/multicluster.Provider] that represents each [logical cluster] +// (in the kcp sense) having a specific initializer and exposed via the initializingworkspaces virtual workspace endpoint as a cluster in the [sigs.k8s.io/multicluster-runtime] sense. +// +// [logical cluster]: https://docs.kcp.io/kcp/latest/concepts/terminology/#logical-cluster +type Provider struct { + config *rest.Config + scheme *runtime.Scheme + wildcardCache mcpcache.WildcardCache + initializerName string + + log logr.Logger + + lock sync.RWMutex + clusters map[logicalcluster.Name]cluster.Cluster + cancelFns map[logicalcluster.Name]context.CancelFunc +} + +// Options are the options for creating a new instance of the initializing workspaces provider. +type Options struct { + // Scheme is the scheme to use for the provider. If this is nil, it defaults + // to the client-go scheme. + Scheme *runtime.Scheme + // WildcardCache is the wildcard cache to use for the provider. If this is + // nil, a new wildcard cache will be created for the given rest.Config. + WildcardCache mcpcache.WildcardCache + + // InitializerName is the name of the initializer to watch for in LogicalCluster.Status.Initializers + InitializerName string +} + +// New creates a new KCP initializing workspaces provider. +func New(cfg *rest.Config, options Options) (*Provider, error) { + if options.Scheme == nil { + options.Scheme = scheme.Scheme + } + + if options.WildcardCache == nil { + var err error + options.WildcardCache, err = mcpcache.NewWildcardCache(cfg, cache.Options{ + Scheme: options.Scheme, + }) + if err != nil { + return nil, fmt.Errorf("failed to create wildcard cache: %w", err) + } + } + + if options.InitializerName == "" { + return nil, fmt.Errorf("initializer name cannot be empty") + } + + return &Provider{ + config: cfg, + scheme: options.Scheme, + wildcardCache: options.WildcardCache, + initializerName: options.InitializerName, + log: log.Log.WithName("kcp-initializing-workspaces-provider"), + + clusters: map[logicalcluster.Name]cluster.Cluster{}, + cancelFns: map[logicalcluster.Name]context.CancelFunc{}, + }, nil +} + +// Run starts the provider and blocks. +func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error { + g, ctx := errgroup.WithContext(ctx) + inf, err := p.wildcardCache.GetInformer(ctx, &kcpcorev1alpha1.LogicalCluster{}, cache.BlockUntilSynced(true)) + if err != nil { + return fmt.Errorf("failed to get informer: %w", err) + } + + if _, err := inf.AddEventHandler(toolscache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + p.handleLogicalClusterEvent(ctx, obj, mgr) + }, + UpdateFunc: func(_, newObj any) { + p.handleLogicalClusterEvent(ctx, newObj, mgr) + }, + DeleteFunc: func(obj any) { + cobj, ok := obj.(client.Object) + if !ok { + tombstone, ok := obj.(toolscache.DeletedFinalStateUnknown) + if !ok { + klog.Errorf("Couldn't get object from tombstone %#v", obj) + return + } + cobj, ok = tombstone.Obj.(client.Object) + if !ok { + klog.Errorf("Tombstone contained object that is not expected %#v", obj) + return + } + } + // check if there is no object left in the index + if lc, ok := cobj.(*kcpcorev1alpha1.LogicalCluster); ok { + clusterName := logicalcluster.From(lc) + p.lock.Lock() + cancel, ok := p.cancelFns[clusterName] + if ok { + p.log.Info("disengaging non-initializing workspace", "cluster", clusterName) + cancel() + delete(p.cancelFns, clusterName) + delete(p.clusters, clusterName) + } + p.lock.Unlock() + } else { + klog.Errorf("unexpected object type %T, expected LogicalCluster", cobj) + } + }, + }); err != nil { + return fmt.Errorf("failed to add EventHandler: %w", err) + } + + g.Go(func() error { + err := p.wildcardCache.Start(ctx) + if err != nil { + return err + } + return nil + }) + + syncCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + if _, err := p.wildcardCache.GetInformer(syncCtx, &kcpcorev1alpha1.LogicalCluster{}, cache.BlockUntilSynced(true)); err != nil { + return fmt.Errorf("failed to sync informer: %w", err) + } + + return g.Wait() +} + +// handleLogicalClusterEvent processes LogicalCluster events and engages initializing workspaces +func (p *Provider) handleLogicalClusterEvent(ctx context.Context, obj any, mgr mcmanager.Manager) { + cobj, ok := obj.(client.Object) + if !ok { + klog.Errorf("unexpected object type %T", obj) + return + } + + lc, ok := cobj.(*kcpcorev1alpha1.LogicalCluster) + if !ok { + klog.Errorf("unexpected object type %T, expected LogicalCluster", cobj) + return + } + + // Check if our initializer is in the initializers list + hasInitializer := slices.Contains(lc.Status.Initializers, kcpcorev1alpha1.LogicalClusterInitializer(p.initializerName)) + clusterName := logicalcluster.From(cobj) + // If our initializer is not present, we need to disengage the cluster if it exists + if !hasInitializer { + p.lock.Lock() + defer p.lock.Unlock() + cancel, ok := p.cancelFns[clusterName] + if ok { + p.log.Info("disengaging non-initializing workspace", "cluster", clusterName) + cancel() + delete(p.cancelFns, clusterName) + delete(p.clusters, clusterName) + } + return + } + + // fast path: cluster exists already, there is nothing to do. + p.lock.RLock() + if _, ok := p.clusters[clusterName]; ok { + p.lock.RUnlock() + return + } + p.lock.RUnlock() + + // slow path: take write lock to add a new cluster (unless it appeared in the meantime). + p.lock.Lock() + if _, ok := p.clusters[clusterName]; ok { + p.lock.Unlock() + return + } + + p.log.Info("LogicalCluster added", "object", clusterName) + // create new specific cluster with correct host endpoint for fetching specific logical cluster. + clusterCtx, cancel := context.WithCancel(ctx) + defer cancel() + + cfg := rest.CopyConfig(p.config) + host := cfg.Host + host = strings.TrimSuffix(host, "/clusters/*") + cfg.Host = fmt.Sprintf("%s/clusters/%s", host, clusterName) + + cl, err := mcpcache.NewScopedInitializingCluster(cfg, clusterName, p.wildcardCache, p.scheme) + if err != nil { + p.log.Error(err, "failed to create cluster", "cluster", clusterName) + cancel() + p.lock.Unlock() + return + } + p.clusters[clusterName] = cl + p.cancelFns[clusterName] = cancel + p.lock.Unlock() + + p.log.Info("engaging cluster", "cluster", clusterName) + if err := mgr.Engage(clusterCtx, clusterName.String(), cl); err != nil { + p.log.Error(err, "failed to engage cluster", "cluster", clusterName) + p.lock.Lock() + cancel() + if p.clusters[clusterName] == cl { + delete(p.clusters, clusterName) + delete(p.cancelFns, clusterName) + } + p.lock.Unlock() + } + p.log.Info("engaged and registered cluster", "cluster", clusterName) +} + +// Get returns a [cluster.Cluster] by logical cluster name. +func (p *Provider) Get(_ context.Context, name string) (cluster.Cluster, error) { + p.lock.RLock() + defer p.lock.RUnlock() + if cl, ok := p.clusters[logicalcluster.Name(name)]; ok { + return cl, nil + } + + return nil, multicluster.ErrClusterNotFound +} + +// IndexField indexes the given object by the given field on all engaged clusters, current and future. +func (p *Provider) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error { + return p.wildcardCache.IndexField(ctx, obj, field, extractValue) +} diff --git a/apiexport/cache.go b/internal/cache/cache.go similarity index 95% rename from apiexport/cache.go rename to internal/cache/cache.go index bf6ea0a..19d0b2b 100644 --- a/apiexport/cache.go +++ b/internal/cache/cache.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package apiexport +package cache import ( "context" @@ -38,21 +38,24 @@ type scopedCache struct { clusterName logicalcluster.Name } +// Start starts the cache. func (c *scopedCache) Start(ctx context.Context) error { return errors.New("scoped cache cannot be started") } +// WaitForCacheSync waits for the cache to be synced. func (c *scopedCache) WaitForCacheSync(ctx context.Context) bool { return c.base.WaitForCacheSync(ctx) } +// IndexField indexes a field in the cache. func (c *scopedCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error { return c.base.IndexField(ctx, obj, field, extractValue) } // Get returns a single object from the cache. func (c *scopedCache) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { - inf, gvk, scope, err := c.base.getSharedInformer(obj) + inf, gvk, scope, err := c.base.GetSharedInformer(obj) if err != nil { return fmt.Errorf("failed to get informer for %T %s: %w", obj, obj.GetObjectKind().GroupVersionKind(), err) } @@ -70,7 +73,7 @@ func (c *scopedCache) Get(ctx context.Context, key client.ObjectKey, obj client. // List returns a list of objects from the cache. func (c *scopedCache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { - inf, gvk, scope, err := c.base.getSharedInformer(list) + inf, gvk, scope, err := c.base.GetSharedInformer(list) if err != nil { return fmt.Errorf("failed to get informer for %T %s: %w", list, list.GetObjectKind().GroupVersionKind(), err) } diff --git a/apiexport/cluster.go b/internal/cache/cluster.go similarity index 51% rename from apiexport/cluster.go rename to internal/cache/cluster.go index f28b2ae..a641a22 100644 --- a/apiexport/cluster.go +++ b/internal/cache/cluster.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package apiexport +package cache import ( "context" @@ -35,7 +35,8 @@ import ( "github.com/kcp-dev/logicalcluster/v3" ) -func newScopedCluster(cfg *rest.Config, clusterName logicalcluster.Name, wildcardCA WildcardCache, scheme *runtime.Scheme) (*scopedCluster, error) { +// NewScopedCluster constructs a new cluster.Cluster that operates on a specific logical cluster. +func NewScopedCluster(cfg *rest.Config, clusterName logicalcluster.Name, wildcardCA WildcardCache, scheme *runtime.Scheme) (*ScopedCluster, error) { cfg = rest.CopyConfig(cfg) host, err := url.JoinPath(cfg.Host, clusterName.Path().RequestPath()) if err != nil { @@ -64,7 +65,7 @@ func newScopedCluster(cfg *rest.Config, clusterName logicalcluster.Name, wildcar return nil, err } - return &scopedCluster{ + return &ScopedCluster{ clusterName: clusterName, config: cfg, scheme: scheme, @@ -75,10 +76,52 @@ func newScopedCluster(cfg *rest.Config, clusterName logicalcluster.Name, wildcar }, nil } -var _ cluster.Cluster = &scopedCluster{} +// NewScopedInitializingCluster constructs a new cluster.Cluster that operates on a specific logical cluster +// for initializing workspaces. It doesn't construct client to read from wildcard cache. +func NewScopedInitializingCluster(cfg *rest.Config, clusterName logicalcluster.Name, wildcardCA WildcardCache, scheme *runtime.Scheme) (*ScopedCluster, error) { + cfg = rest.CopyConfig(cfg) + host, err := url.JoinPath(cfg.Host, clusterName.Path().RequestPath()) + if err != nil { + return nil, fmt.Errorf("failed to construct scoped cluster URL: %w", err) + } + cfg.Host = host + + // construct a scoped cache that uses the wildcard cache as base. + ca := &scopedCache{ + base: wildcardCA, + clusterName: clusterName, + } + + cli, err := client.New(cfg, client.Options{Scheme: scheme}) + if err != nil { + return nil, err + } + + httpClient, err := rest.HTTPClientFor(cfg) + if err != nil { + return nil, err + } + + mapper, err := apiutil.NewDynamicRESTMapper(cfg, httpClient) + if err != nil { + return nil, err + } + + return &ScopedCluster{ + clusterName: clusterName, + config: cfg, + scheme: scheme, + client: cli, + httpClient: httpClient, + mapper: mapper, + cache: ca, + }, nil +} + +var _ cluster.Cluster = &ScopedCluster{} -// scopedCluster is a cluster that operates on a specific namespace. -type scopedCluster struct { +// ScopedCluster is a cluster that operates on a specific namespace. +type ScopedCluster struct { clusterName logicalcluster.Name scheme *runtime.Scheme @@ -89,47 +132,52 @@ type scopedCluster struct { cache cache.Cache } -func (c *scopedCluster) GetHTTPClient() *http.Client { +// GetHTTPClient returns the HTTP client scoped to the cluster. +func (c *ScopedCluster) GetHTTPClient() *http.Client { return c.httpClient } -func (c *scopedCluster) GetConfig() *rest.Config { +// GetConfig returns the rest.Config scoped to the cluster. +func (c *ScopedCluster) GetConfig() *rest.Config { return c.config } -func (c *scopedCluster) GetScheme() *runtime.Scheme { +// GetScheme returns the scheme scoped to the cluster. +func (c *ScopedCluster) GetScheme() *runtime.Scheme { return c.scheme } -func (c *scopedCluster) GetFieldIndexer() client.FieldIndexer { +// GetFieldIndexer returns a FieldIndexer scoped to the cluster. +func (c *ScopedCluster) GetFieldIndexer() client.FieldIndexer { return c.cache } -func (c *scopedCluster) GetRESTMapper() meta.RESTMapper { +// GetRESTMapper returns a RESTMapper scoped to the cluster. +func (c *ScopedCluster) GetRESTMapper() meta.RESTMapper { return c.mapper } // GetCache returns a cache.Cache. -func (c *scopedCluster) GetCache() cache.Cache { +func (c *ScopedCluster) GetCache() cache.Cache { return c.cache } // GetClient returns a client scoped to the namespace. -func (c *scopedCluster) GetClient() client.Client { +func (c *ScopedCluster) GetClient() client.Client { return c.client } // GetEventRecorderFor returns a new EventRecorder for the provided name. -func (c *scopedCluster) GetEventRecorderFor(name string) record.EventRecorder { +func (c *ScopedCluster) GetEventRecorderFor(name string) record.EventRecorder { panic("implement me") } // GetAPIReader returns a reader against the cluster. -func (c *scopedCluster) GetAPIReader() client.Reader { +func (c *ScopedCluster) GetAPIReader() client.Reader { return c.cache } // Start starts the cluster. -func (c *scopedCluster) Start(ctx context.Context) error { +func (c *ScopedCluster) Start(ctx context.Context) error { return errors.New("scoped cluster cannot be started") } diff --git a/apiexport/forked_cache_reader.go b/internal/cache/forked_cache_reader.go similarity index 99% rename from apiexport/forked_cache_reader.go rename to internal/cache/forked_cache_reader.go index 9f9b006..b8f5026 100644 --- a/apiexport/forked_cache_reader.go +++ b/internal/cache/forked_cache_reader.go @@ -17,7 +17,7 @@ limitations under the License. // This file has been forked from https://github.com/kubernetes-sigs/controller-runtime/blob/78b3ce63cf927debb122dd641290a89d20d776e3/pkg/cache/internal/cache_reader.go. // It's been modified to allow scoping a CacheReader to a specific logical cluster. -package apiexport +package cache import ( "context" diff --git a/apiexport/indexes.go b/internal/cache/indexes.go similarity index 98% rename from apiexport/indexes.go rename to internal/cache/indexes.go index 6c5d50a..b8fc9ac 100644 --- a/apiexport/indexes.go +++ b/internal/cache/indexes.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package apiexport +package cache import ( "fmt" diff --git a/apiexport/indexes_test.go b/internal/cache/indexes_test.go similarity index 99% rename from apiexport/indexes_test.go rename to internal/cache/indexes_test.go index ef99407..9781087 100644 --- a/apiexport/indexes_test.go +++ b/internal/cache/indexes_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package apiexport +package cache import ( "testing" diff --git a/apiexport/wildcard.go b/internal/cache/wildcard.go similarity index 97% rename from apiexport/wildcard.go rename to internal/cache/wildcard.go index 0605056..30205fa 100644 --- a/apiexport/wildcard.go +++ b/internal/cache/wildcard.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package apiexport +package cache import ( "context" @@ -43,7 +43,7 @@ import ( // WildcardCache is a cache that operates on a '/clusters/*' endpoint. type WildcardCache interface { cache.Cache - getSharedInformer(obj runtime.Object) (k8scache.SharedIndexInformer, schema.GroupVersionKind, apimeta.RESTScopeName, error) + GetSharedInformer(obj runtime.Object) (k8scache.SharedIndexInformer, schema.GroupVersionKind, apimeta.RESTScopeName, error) } // NewWildcardCache returns a cache.Cache that handles multi-cluster watches @@ -127,7 +127,7 @@ type wildcardCache struct { readerFailOnMissingInformer bool } -func (c *wildcardCache) getSharedInformer(obj runtime.Object) (k8scache.SharedIndexInformer, schema.GroupVersionKind, apimeta.RESTScopeName, error) { +func (c *wildcardCache) GetSharedInformer(obj runtime.Object) (k8scache.SharedIndexInformer, schema.GroupVersionKind, apimeta.RESTScopeName, error) { gvk, err := apiutil.GVKForObject(obj, c.scheme) if err != nil { return nil, gvk, "", fmt.Errorf("failed to get GVK for object: %w", err) diff --git a/test/e2e/apiexport_test.go b/test/e2e/apiexport_test.go index e1b5624..cee7cbf 100644 --- a/test/e2e/apiexport_test.go +++ b/test/e2e/apiexport_test.go @@ -55,7 +55,7 @@ import ( . "github.com/onsi/gomega" ) -var _ = Describe("VirtualWorkspace Provider", Ordered, func() { +var _ = Describe("APIExport Provider", Ordered, func() { var ( ctx context.Context cancel context.CancelFunc diff --git a/test/e2e/initializingworkspaces_test.go b/test/e2e/initializingworkspaces_test.go new file mode 100644 index 0000000..102caa8 --- /dev/null +++ b/test/e2e/initializingworkspaces_test.go @@ -0,0 +1,245 @@ +/* +Copyright 2025 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "context" + "fmt" + "slices" + "time" + + "golang.org/x/sync/errgroup" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + mcbuilder "sigs.k8s.io/multicluster-runtime/pkg/builder" + mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" + mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" + + "github.com/kcp-dev/kcp/sdk/apis/core" + kcpcorev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" + "github.com/kcp-dev/kcp/sdk/apis/tenancy/initialization" + tenancyv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/tenancy/v1alpha1" + "github.com/kcp-dev/logicalcluster/v3" + + clusterclient "github.com/kcp-dev/multicluster-provider/client" + "github.com/kcp-dev/multicluster-provider/envtest" + "github.com/kcp-dev/multicluster-provider/initializingworkspaces" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("InitializingWorkspaces Provider", Ordered, func() { + const initName = "root:e2e-test-ws-type" + const workspaceTypeName = "e2e-test-ws-type" + + var ( + ctx context.Context + cancel context.CancelFunc + + cli clusterclient.ClusterClient + ws1Path, ws2Path logicalcluster.Path + ws1, ws2 *tenancyv1alpha1.Workspace + mgr mcmanager.Manager + ) + + BeforeAll(func() { + ctx, cancel = context.WithCancel(context.Background()) + + var err error + cli, err = clusterclient.New(kcpConfig, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + + // Create test workspaces with the WorkspaceType that has our initializer + By("creating WorkspaceType with initializer") + workspaceType := &tenancyv1alpha1.WorkspaceType{ + ObjectMeta: metav1.ObjectMeta{ + Name: workspaceTypeName, + }, + Spec: tenancyv1alpha1.WorkspaceTypeSpec{ + Initializer: true, + }, + } + err = cli.Cluster(core.RootCluster.Path()).Create(ctx, workspaceType) + Expect(err).NotTo(HaveOccurred()) + + // Wait for the WorkspaceType to be ready + envtest.Eventually(GinkgoT(), func() (bool, string) { + wt := &tenancyv1alpha1.WorkspaceType{} + err := cli.Cluster(core.RootCluster.Path()).Get(ctx, client.ObjectKey{Name: workspaceTypeName}, wt) + if err != nil { + return false, fmt.Sprintf("failed to get WorkspaceType: %v", err) + } + return len(wt.Status.VirtualWorkspaces) > 0, fmt.Sprintf("WorkspaceType not ready: %v", wt.Status) + }, wait.ForeverTestTimeout, time.Millisecond*100, "failed to wait for WorkspaceType to be ready") + + By("creating Workspaces with the WorkspaceType with initializers") + ws1, ws1Path = envtest.NewInitializingWorkspaceFixture(GinkgoT(), cli, core.RootCluster.Path(), + envtest.WithNamePrefix("init-ws1"), + envtest.WithType(core.RootCluster.Path(), tenancyv1alpha1.WorkspaceTypeName(workspaceTypeName))) + + ws2, ws2Path = envtest.NewInitializingWorkspaceFixture(GinkgoT(), cli, core.RootCluster.Path(), + envtest.WithNamePrefix("init-ws2"), + envtest.WithType(core.RootCluster.Path(), tenancyv1alpha1.WorkspaceTypeName(workspaceTypeName))) + }) + + It("sees both clusters with initializers", func() { + By("getting LogicalCluster for workspaces and their cluster names") + lc1 := &kcpcorev1alpha1.LogicalCluster{} + err := cli.Cluster(ws1Path).Get(ctx, client.ObjectKey{Name: "cluster"}, lc1) + Expect(err).NotTo(HaveOccurred()) + + lc2 := &kcpcorev1alpha1.LogicalCluster{} + err = cli.Cluster(ws2Path).Get(ctx, client.ObjectKey{Name: "cluster"}, lc2) + Expect(err).NotTo(HaveOccurred()) + envtest.Eventually(GinkgoT(), func() (bool, string) { + return slices.Contains(lc1.Status.Initializers, kcpcorev1alpha1.LogicalClusterInitializer(initName)) && slices.Contains(lc2.Status.Initializers, kcpcorev1alpha1.LogicalClusterInitializer(initName)), + fmt.Sprintf("Initializer not set: %v", lc1.Status) + " " + fmt.Sprintf("%v", lc2.Status) + }, wait.ForeverTestTimeout, time.Millisecond*100, "failed to see initializers in both clusters") + }) + + Describe("with a multicluster provider and manager", func() { + var ( + engaged = sets.NewString() + p *initializingworkspaces.Provider + g *errgroup.Group + cancelGroup context.CancelFunc + initializersRemoved = sets.NewString() + ) + + BeforeAll(func() { + cli, err := clusterclient.New(kcpConfig, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + By("creating a multicluster provider for initializing workspaces") + + // Get the initializing workspaces virtual workspace URL + wt := &tenancyv1alpha1.WorkspaceType{} + err = cli.Cluster(core.RootCluster.Path()).Get(ctx, client.ObjectKey{Name: workspaceTypeName}, wt) + Expect(err).NotTo(HaveOccurred()) + Expect(wt.Status.VirtualWorkspaces).ToNot(BeEmpty()) + + vwConfig := rest.CopyConfig(kcpConfig) + vwConfig.Host = wt.Status.VirtualWorkspaces[0].URL + + // Create the provider with the initializer name from the WorkspaceType + p, err = initializingworkspaces.New(vwConfig, initializingworkspaces.Options{ + InitializerName: initName, + }) + Expect(err).NotTo(HaveOccurred()) + + By("creating a manager that uses the provider") + mgr, err = mcmanager.New(vwConfig, p, mcmanager.Options{}) + Expect(err).NotTo(HaveOccurred()) + + By("creating a reconciler for LogicalClusters") + err = mcbuilder.ControllerManagedBy(mgr). + Named("kcp-initializer-controller"). + For(&kcpcorev1alpha1.LogicalCluster{}). + Complete(mcreconcile.Func( + func(ctx context.Context, request mcreconcile.Request) (ctrl.Result, error) { + By(fmt.Sprintf("reconciling LogicalCluster %s in cluster %q", request.Name, request.ClusterName)) + cl, err := mgr.GetCluster(ctx, request.ClusterName) + if err != nil { + return reconcile.Result{}, err + } + + clusterClient := cl.GetClient() + lc := &kcpcorev1alpha1.LogicalCluster{} + if err := clusterClient.Get(ctx, request.NamespacedName, lc); err != nil { + return reconcile.Result{}, fmt.Errorf("failed to get logical cluster: %w", err) + } + + engaged.Insert(request.ClusterName) + initializer := kcpcorev1alpha1.LogicalClusterInitializer(initName) + + if slices.Contains(lc.Status.Initializers, initializer) { + By(fmt.Sprintf("removing initializer %q from LogicalCluster %s in cluster %q", initName, request.Name, request.ClusterName)) + + patch := client.MergeFrom(lc.DeepCopy()) + lc.Status.Initializers = initialization.EnsureInitializerAbsent(initializer, lc.Status.Initializers) + if err := clusterClient.Status().Patch(ctx, lc, patch); err != nil { + return reconcile.Result{}, err + } + initializersRemoved.Insert(request.ClusterName) + } + return reconcile.Result{}, nil + })) + Expect(err).NotTo(HaveOccurred()) + + By("starting the provider and manager") + var groupContext context.Context + groupContext, cancelGroup = context.WithCancel(ctx) + g, groupContext = errgroup.WithContext(groupContext) + g.Go(func() error { + return p.Run(groupContext, mgr) + }) + g.Go(func() error { + return mgr.Start(groupContext) + }) + }) + It("engages both Logical Clusters with initializers", func() { + envtest.Eventually(GinkgoT(), func() (bool, string) { + return engaged.Has(ws1.Spec.Cluster), fmt.Sprintf("failed to see workspace %q engaged as a cluster: %v", ws1.Spec.Cluster, engaged.List()) + }, wait.ForeverTestTimeout, time.Millisecond*100, "failed to see workspace %q engaged as a cluster: %v", ws1.Spec.Cluster, engaged.List()) + + envtest.Eventually(GinkgoT(), func() (bool, string) { + return engaged.Has(ws2.Spec.Cluster), fmt.Sprintf("failed to see workspace %q engaged as a cluster: %v", ws2.Spec.Cluster, engaged.List()) + }, wait.ForeverTestTimeout, time.Millisecond*100, "failed to see workspace %q engaged as a cluster: %v", ws2.Spec.Cluster, engaged.List()) + }) + + It("removes initializers from the both clusters after engaging", func() { + envtest.Eventually(GinkgoT(), func() (bool, string) { + return initializersRemoved.Has(ws1.Spec.Cluster), fmt.Sprintf("failed to see removed initializer from %q cluster: %v", ws1.Spec.Cluster, initializersRemoved.List()) + }, wait.ForeverTestTimeout, time.Millisecond*100, "failed to see removed initializer from %q cluster: %v", ws1.Spec.Cluster, initializersRemoved.List()) + + envtest.Eventually(GinkgoT(), func() (bool, string) { + return initializersRemoved.Has(ws2.Spec.Cluster), fmt.Sprintf("failed to see removed initializer from %q cluster: %v", ws2.Spec.Cluster, initializersRemoved.List()) + }, wait.ForeverTestTimeout, time.Millisecond*100, "failed to see removed initializer from %q cluster: %v", ws2.Spec.Cluster, initializersRemoved.List()) + + By("checking if LogicalClusters objects have no initializers left") + var err error + lc1 := &kcpcorev1alpha1.LogicalCluster{} + err = cli.Cluster(ws1Path).Get(ctx, client.ObjectKey{Name: "cluster"}, lc1) + Expect(err).NotTo(HaveOccurred()) + + lc2 := &kcpcorev1alpha1.LogicalCluster{} + err = cli.Cluster(ws2Path).Get(ctx, client.ObjectKey{Name: "cluster"}, lc2) + Expect(err).NotTo(HaveOccurred()) + envtest.Eventually(GinkgoT(), func() (bool, string) { + return !slices.Contains(lc1.Status.Initializers, kcpcorev1alpha1.LogicalClusterInitializer(initName)) && !slices.Contains(lc2.Status.Initializers, kcpcorev1alpha1.LogicalClusterInitializer(initName)), + fmt.Sprintf("Initializer not set: %v", lc1.Status) + " " + fmt.Sprintf("%v", lc2.Status) + }, wait.ForeverTestTimeout, time.Millisecond*100, "failed to see removed initializers in both clusters") + }) + + AfterAll(func() { + cancelGroup() + err := g.Wait() + Expect(err).NotTo(HaveOccurred()) + }) + }) + + AfterAll(func() { + cancel() + }) +})