From 9aba3a0c6387cc871c90979e7f3cd7ea3bd182a2 Mon Sep 17 00:00:00 2001 From: rafia sabih Date: Fri, 14 Jun 2024 14:39:11 +0200 Subject: [PATCH 01/11] Add separate PVC for wal Define WalPvc filed with the name of the pvc and the volume specifications to create a separate pvc for wal files. --- .../cpo.opensource.cybertec.at/v1/crds.go | 79 +++++++++++++++++++ .../v1/postgresql_type.go | 6 ++ .../v1/zz_generated.deepcopy.go | 22 ++++++ pkg/cluster/k8sres.go | 16 +++- .../clientset/versioned/fake/register.go | 14 ++-- .../clientset/versioned/scheme/register.go | 14 ++-- 6 files changed, 135 insertions(+), 16 deletions(-) diff --git a/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go b/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go index 41134647..f561a47e 100644 --- a/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go +++ b/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go @@ -1347,6 +1347,85 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ }, }, }, + "walPvc": { + Type: "object", + Nullable: true, + Properties: map[string]apiextv1.JSONSchemaProps{ + "waldir": { + Type: "string", + }, + "walvolume": { + Type: "object", + Required: []string{"size"}, + Properties: map[string]apiextv1.JSONSchemaProps{ + "iops": { + Type: "integer", + }, + "selector": { + Type: "object", + Properties: map[string]apiextv1.JSONSchemaProps{ + "matchExpressions": { + Type: "array", + Items: &apiextv1.JSONSchemaPropsOrArray{ + Schema: &apiextv1.JSONSchemaProps{ + Type: "object", + Required: []string{"key", "operator"}, + Properties: map[string]apiextv1.JSONSchemaProps{ + "key": { + Type: "string", + }, + "operator": { + Type: "string", + Enum: []apiextv1.JSON{ + { + Raw: []byte(`"DoesNotExist"`), + }, + { + Raw: []byte(`"Exists"`), + }, + { + Raw: []byte(`"In"`), + }, + { + Raw: []byte(`"NotIn"`), + }, + }, + }, + "values": { + Type: "array", + Items: &apiextv1.JSONSchemaPropsOrArray{ + Schema: &apiextv1.JSONSchemaProps{ + Type: "string", + }, + }, + }, + }, + }, + }, + }, + "matchLabels": { + Type: "object", + XPreserveUnknownFields: util.True(), + }, + }, + }, + "size": { + Type: "string", + Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", + }, + "storageClass": { + Type: "string", + }, + "subPath": { + Type: "string", + }, + "throughput": { + Type: "integer", + }, + }, + }, + }, + }, }, }, "status": { diff --git a/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go b/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go index 98b9e3ce..750e0ae7 100644 --- a/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go +++ b/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go @@ -96,6 +96,12 @@ type PostgresSpec struct { Backup *Backup `json:"backup,omitempty"` TDE *TDE `json:"tde,omitempty"` Monitoring *Monitoring `json:"monitor,omitempty"` + WalPvc *PVCVolume `json:"walPvc,omitempty"` +} + +type PVCVolume struct { + WalVolume Volume `json:"walvolume,omitempty"` + WalDir string `json:"waldir,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/cpo.opensource.cybertec.at/v1/zz_generated.deepcopy.go b/pkg/apis/cpo.opensource.cybertec.at/v1/zz_generated.deepcopy.go index 662d4f78..2891a964 100644 --- a/pkg/apis/cpo.opensource.cybertec.at/v1/zz_generated.deepcopy.go +++ b/pkg/apis/cpo.opensource.cybertec.at/v1/zz_generated.deepcopy.go @@ -583,6 +583,23 @@ func (in *OperatorTimeouts) DeepCopy() *OperatorTimeouts { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PVCVolume) DeepCopyInto(out *PVCVolume) { + *out = *in + in.WalVolume.DeepCopyInto(&out.WalVolume) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PVCVolume. +func (in *PVCVolume) DeepCopy() *PVCVolume { + if in == nil { + return nil + } + out := new(PVCVolume) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Patroni) DeepCopyInto(out *Patroni) { *out = *in @@ -965,6 +982,11 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) { *out = new(Monitoring) **out = **in } + if in.WalPvc != nil { + in, out := &in.WalPvc, &out.WalPvc + *out = new(PVCVolume) + (*in).DeepCopyInto(*out) + } return } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index c331a343..ccd7068f 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1290,9 +1290,9 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu sidecarContainers []v1.Container podTemplate *v1.PodTemplateSpec volumeClaimTemplate *v1.PersistentVolumeClaim + WalPvcClaim *v1.PersistentVolumeClaim additionalVolumes = spec.AdditionalVolumes ) - defaultResources := makeDefaultResources(&c.OpConfig) resourceRequirements, err := c.generateResourceRequirements( spec.Resources, defaultResources, constants.PostgresContainerName) @@ -1333,6 +1333,9 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu if spec.TDE != nil && spec.TDE.Enable { enableTDE = true } + if spec.WalPvc != nil { + spec.PostgresqlParam.Parameters["basebackup"] = "- waldir: " + spec.WalPvc.WalDir + } spiloConfiguration, err := generateSpiloJSONConfiguration(&spec.PostgresqlParam, &spec.Patroni, &c.OpConfig, enableTDE, c.logger) if err != nil { return nil, fmt.Errorf("could not generate Spilo JSON configuration: %v", err) @@ -1509,6 +1512,15 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu additionalVolumes = append(additionalVolumes, c.generateCertSecretVolume()) } } + if spec.WalPvc != nil { + WalPvcClaim, err = c.generatePersistentVolumeClaimTemplate(spec.WalPvc.WalVolume.Size, + spec.WalPvc.WalVolume.StorageClass, spec.WalPvc.WalVolume.Selector, spec.WalPvc.WalDir) + if err != nil { + c.logger.Errorf("could not generate volume claim template for WAL directory: %v", err) + } + } else { + WalPvcClaim = nil + } // generate pod template for the statefulset, based on the spilo container and sidecars podTemplate, err = c.generatePodTemplate( @@ -1590,7 +1602,7 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu Selector: c.labelsSelector(TYPE_POSTGRESQL), ServiceName: c.serviceName(Master), Template: *podTemplate, - VolumeClaimTemplates: []v1.PersistentVolumeClaim{*volumeClaimTemplate}, + VolumeClaimTemplates: []v1.PersistentVolumeClaim{*volumeClaimTemplate, *WalPvcClaim}, UpdateStrategy: updateStrategy, PodManagementPolicy: podManagementPolicy, PersistentVolumeClaimRetentionPolicy: &persistentVolumeClaimRetentionPolicy, diff --git a/pkg/generated/clientset/versioned/fake/register.go b/pkg/generated/clientset/versioned/fake/register.go index 061b3064..9ac0f665 100644 --- a/pkg/generated/clientset/versioned/fake/register.go +++ b/pkg/generated/clientset/versioned/fake/register.go @@ -45,14 +45,14 @@ var localSchemeBuilder = runtime.SchemeBuilder{ // AddToScheme adds all types of this clientset into the given scheme. This allows composition // of clientsets, like in: // -// import ( -// "k8s.io/client-go/kubernetes" -// clientsetscheme "k8s.io/client-go/kubernetes/scheme" -// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" -// ) +// import ( +// "k8s.io/client-go/kubernetes" +// clientsetscheme "k8s.io/client-go/kubernetes/scheme" +// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" +// ) // -// kclientset, _ := kubernetes.NewForConfig(c) -// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) +// kclientset, _ := kubernetes.NewForConfig(c) +// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) // // After this, RawExtensions in Kubernetes types will serialize kube-aggregator types // correctly. diff --git a/pkg/generated/clientset/versioned/scheme/register.go b/pkg/generated/clientset/versioned/scheme/register.go index 9ead92b8..99ebde68 100644 --- a/pkg/generated/clientset/versioned/scheme/register.go +++ b/pkg/generated/clientset/versioned/scheme/register.go @@ -45,14 +45,14 @@ var localSchemeBuilder = runtime.SchemeBuilder{ // AddToScheme adds all types of this clientset into the given scheme. This allows composition // of clientsets, like in: // -// import ( -// "k8s.io/client-go/kubernetes" -// clientsetscheme "k8s.io/client-go/kubernetes/scheme" -// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" -// ) +// import ( +// "k8s.io/client-go/kubernetes" +// clientsetscheme "k8s.io/client-go/kubernetes/scheme" +// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" +// ) // -// kclientset, _ := kubernetes.NewForConfig(c) -// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) +// kclientset, _ := kubernetes.NewForConfig(c) +// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) // // After this, RawExtensions in Kubernetes types will serialize kube-aggregator types // correctly. From babbf11b5f1de9c1c0fabdf78de38fa67524ae3b Mon Sep 17 00:00:00 2001 From: rafia sabih Date: Mon, 24 Jun 2024 13:04:51 +0200 Subject: [PATCH 02/11] add volume mounting --- pkg/cluster/k8sres.go | 26 +++++++++++++++++++++----- pkg/cluster/sync.go | 2 +- pkg/util/constants/postgresql.go | 2 ++ 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index ccd7068f..6125ec46 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -428,6 +428,7 @@ PatroniInitDBParams: if len(local) > 0 { config.PgLocalConfiguration[constants.PatroniPGParametersParameterName] = local + logger.Infof("+-+--+-+--+--+-- Recevived local params are %v", local) } if len(bootstrap) > 0 { config.Bootstrap.DCS.PGBootstrapConfiguration = make(map[string]interface{}) @@ -659,6 +660,14 @@ func generateVolumeMounts(volume cpov1.Volume) []v1.VolumeMount { } } +func generateWalVolumeMounts(volume cpov1.Volume, dir string) v1.VolumeMount { + return v1.VolumeMount{ + Name: dir, + MountPath: constants.PostgresWalMount, //TODO: fetch from manifest + SubPath: volume.SubPath, + } +} + func generateContainer( name string, dockerImage *string, @@ -1334,12 +1343,13 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu enableTDE = true } if spec.WalPvc != nil { - spec.PostgresqlParam.Parameters["basebackup"] = "- waldir: " + spec.WalPvc.WalDir + spec.PostgresqlParam.Parameters["basebackup"] = "- waldir: \n\t " + spec.WalPvc.WalDir } spiloConfiguration, err := generateSpiloJSONConfiguration(&spec.PostgresqlParam, &spec.Patroni, &c.OpConfig, enableTDE, c.logger) if err != nil { return nil, fmt.Errorf("could not generate Spilo JSON configuration: %v", err) } + c.logger.Infof("######## params are %v", spec.PostgresqlParam) // generate environment variables for the spilo container spiloEnvVars, err := c.generateSpiloPodEnvVars(spec, c.Postgresql.GetUID(), spiloConfiguration) @@ -1368,6 +1378,10 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu volumeMounts := generateVolumeMounts(spec.Volume) + if spec.WalPvc != nil { + volumeMounts = append(volumeMounts, generateWalVolumeMounts(spec.WalPvc.WalVolume, spec.WalPvc.WalDir)) + } + // configure TLS with a custom secret volume if spec.TLS != nil && spec.TLS.SecretName != "" { getSpiloTLSEnv := func(k string) string { @@ -1518,10 +1532,7 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu if err != nil { c.logger.Errorf("could not generate volume claim template for WAL directory: %v", err) } - } else { - WalPvcClaim = nil } - // generate pod template for the statefulset, based on the spilo container and sidecars podTemplate, err = c.generatePodTemplate( c.Namespace, @@ -1590,6 +1601,11 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu persistentVolumeClaimRetentionPolicy.WhenScaled = appsv1.RetainPersistentVolumeClaimRetentionPolicyType } + final_vols := []v1.PersistentVolumeClaim{*volumeClaimTemplate} + if spec.WalPvc != nil { + final_vols = []v1.PersistentVolumeClaim{*volumeClaimTemplate, *WalPvcClaim} + } + statefulSet := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: c.statefulSetName(), @@ -1602,7 +1618,7 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu Selector: c.labelsSelector(TYPE_POSTGRESQL), ServiceName: c.serviceName(Master), Template: *podTemplate, - VolumeClaimTemplates: []v1.PersistentVolumeClaim{*volumeClaimTemplate, *WalPvcClaim}, + VolumeClaimTemplates: final_vols, //[]v1.PersistentVolumeClaim{*volumeClaimTemplate, *WalPvcClaim}, UpdateStrategy: updateStrategy, PodManagementPolicy: podManagementPolicy, PersistentVolumeClaimRetentionPolicy: &persistentVolumeClaimRetentionPolicy, diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 1c42fcf7..35e73cad 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -1632,7 +1632,7 @@ func (c *Cluster) syncPgbackrestJob(forceRemove bool) error { if err := c.createPgbackrestJob(job); err != nil { return fmt.Errorf("could not create a pgbackrest cronjob: %v", err) } - c.logger.Info("pgbackrest cronjob for %v %v has been successfully created", rep, schedul) + c.logger.Infof("pgbackrest cronjob for %v %v has been successfully created", rep, schedul) } } } diff --git a/pkg/util/constants/postgresql.go b/pkg/util/constants/postgresql.go index 8bd7508a..d32a7d66 100644 --- a/pkg/util/constants/postgresql.go +++ b/pkg/util/constants/postgresql.go @@ -18,4 +18,6 @@ const ( RunVolumeName = "postgresql-run" RunVolumePath = "/var/run/postgresql" + + PostgresWalMount = "/home/postgres/pgwal" ) From 80e718aae77e153707b62507bc40845f24a0d84c Mon Sep 17 00:00:00 2001 From: rafia sabih Date: Wed, 10 Jul 2024 12:58:48 +0200 Subject: [PATCH 03/11] Cosmetic changes --- pkg/cluster/k8sres.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 6125ec46..6aa51c9f 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -428,7 +428,6 @@ PatroniInitDBParams: if len(local) > 0 { config.PgLocalConfiguration[constants.PatroniPGParametersParameterName] = local - logger.Infof("+-+--+-+--+--+-- Recevived local params are %v", local) } if len(bootstrap) > 0 { config.Bootstrap.DCS.PGBootstrapConfiguration = make(map[string]interface{}) @@ -1342,9 +1341,6 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu if spec.TDE != nil && spec.TDE.Enable { enableTDE = true } - if spec.WalPvc != nil { - spec.PostgresqlParam.Parameters["basebackup"] = "- waldir: \n\t " + spec.WalPvc.WalDir - } spiloConfiguration, err := generateSpiloJSONConfiguration(&spec.PostgresqlParam, &spec.Patroni, &c.OpConfig, enableTDE, c.logger) if err != nil { return nil, fmt.Errorf("could not generate Spilo JSON configuration: %v", err) From 8cb61b1536cf868d3af87c88bd14fff97aca2b59 Mon Sep 17 00:00:00 2001 From: rafia sabih Date: Wed, 10 Jul 2024 13:02:04 +0200 Subject: [PATCH 04/11] Add env var --- pkg/cluster/k8sres.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 6aa51c9f..3bdbaab2 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1012,6 +1012,9 @@ func (c *Cluster) generateSpiloPodEnvVars( if spec.Monitoring != nil { envVars = append(envVars, v1.EnvVar{Name: "cpo_monitoring_stack", Value: "true"}) } + if spec.WalPvc != nil { + envVars = append(envVars, v1.EnvVar{Name: "WALDIR", Value: spec.WalPvc.WalDir}) + } if c.OpConfig.EnablePgVersionEnvVar { envVars = append(envVars, v1.EnvVar{Name: "PGVERSION", Value: c.GetDesiredMajorVersion()}) @@ -1345,7 +1348,6 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu if err != nil { return nil, fmt.Errorf("could not generate Spilo JSON configuration: %v", err) } - c.logger.Infof("######## params are %v", spec.PostgresqlParam) // generate environment variables for the spilo container spiloEnvVars, err := c.generateSpiloPodEnvVars(spec, c.Postgresql.GetUID(), spiloConfiguration) From 1836b37895a698a95585b9c41fc6fc007926966c Mon Sep 17 00:00:00 2001 From: rafia sabih Date: Thu, 1 Aug 2024 10:20:10 +0200 Subject: [PATCH 05/11] Add the sync code --- pkg/cluster/k8sres.go | 3 ++- pkg/cluster/sync.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 3bdbaab2..22e80c1f 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1014,6 +1014,7 @@ func (c *Cluster) generateSpiloPodEnvVars( } if spec.WalPvc != nil { envVars = append(envVars, v1.EnvVar{Name: "WALDIR", Value: spec.WalPvc.WalDir}) + envVars = append(envVars, v1.EnvVar{Name: "OLD_WALDIR", Value: ""}) } if c.OpConfig.EnablePgVersionEnvVar { @@ -1616,7 +1617,7 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu Selector: c.labelsSelector(TYPE_POSTGRESQL), ServiceName: c.serviceName(Master), Template: *podTemplate, - VolumeClaimTemplates: final_vols, //[]v1.PersistentVolumeClaim{*volumeClaimTemplate, *WalPvcClaim}, + VolumeClaimTemplates: final_vols, UpdateStrategy: updateStrategy, PodManagementPolicy: podManagementPolicy, PersistentVolumeClaimRetentionPolicy: &persistentVolumeClaimRetentionPolicy, diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 35e73cad..48f3a6f3 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -230,6 +230,11 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error { } } + // sync WAL PVC + if !reflect.DeepEqual(oldSpec.Spec.WalPvc, newSpec.Spec.WalPvc) { + c.syncWalPvc(&oldSpec, newSpec) + } + c.logger.Debug("syncing statefulsets") if err = c.syncStatefulSet(); err != nil { if !k8sutil.ResourceAlreadyExists(err) { @@ -1756,6 +1761,37 @@ func (c *Cluster) syncMonitoringSecret(oldSpec, newSpec *cpov1.Postgresql) error return nil } +func (c *Cluster) syncWalPvc(oldSpec, newSpec *cpov1.Postgresql) { + if oldSpec.Spec.WalPvc != nil && newSpec.Spec.WalPvc == nil { + // if the wal_pvc is removed, then + // 1. Change env-vars + // 2. Remove the PVC + pvcs, err := c.listPersistentVolumeClaims() + if err != nil { + c.logger.Error("Could not list PVCs") + } else { + for _, pvc := range pvcs { + if strings.Contains(pvc.Name, oldSpec.Spec.WalPvc.WalDir) { + c.logger.Debugf("deleting WAL-PVC %q", util.NameFromMeta(pvc.ObjectMeta)) + if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, c.deleteOptions); err != nil { + c.logger.Warningf("could not delete WAL PVC: %v", err) + } + } + } + containers := c.Statefulset.Spec.Template.Spec.Containers + for _, con := range containers { + con.Env = append(con.Env, v1.EnvVar{Name: "WALDIR", Value: ""}) + con.Env = append(con.Env, v1.EnvVar{Name: "OLD_WALDIR", Value: oldSpec.Spec.WalPvc.WalDir}) + } + } + + } else if oldSpec.Spec.WalPvc == nil && newSpec.Spec.WalPvc != nil { + // if the wal_pvc is added, then + // 1. Create the PVC + // 2. Change env-vars + } +} + func generateRootCertificate( privateKey *ecdsa.PrivateKey, serialNumber *big.Int, ) (*x509.Certificate, error) { From 5b5aca0fa1c67cf6069baa0e63073494032e6508 Mon Sep 17 00:00:00 2001 From: rafia sabih Date: Fri, 2 Aug 2024 16:07:38 +0200 Subject: [PATCH 06/11] Update and sync process for WAL-PVC --- .../cpo.opensource.cybertec.at/v1/crds.go | 3 ++ .../v1/postgresql_type.go | 1 + pkg/cluster/cluster.go | 30 +++++++++++++- pkg/cluster/k8sres.go | 16 +++++--- pkg/cluster/sync.go | 39 +++++++++++-------- 5 files changed, 65 insertions(+), 24 deletions(-) diff --git a/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go b/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go index f561a47e..4d2617b5 100644 --- a/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go +++ b/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go @@ -1354,6 +1354,9 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ "waldir": { Type: "string", }, + "oldwaldir": { + Type: "string", + }, "walvolume": { Type: "object", Required: []string{"size"}, diff --git a/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go b/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go index 750e0ae7..2915c3f3 100644 --- a/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go +++ b/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go @@ -102,6 +102,7 @@ type PostgresSpec struct { type PVCVolume struct { WalVolume Volume `json:"walvolume,omitempty"` WalDir string `json:"waldir,omitempty"` + OldWalDir string `json:"oldwaldir,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index d09f4c38..7eecb3e6 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -617,7 +617,16 @@ func (c *Cluster) compareStatefulSetWith(oldSts, newSts *appsv1.StatefulSet) *co needsReplace = true reasons = append(reasons, "new statefulset's volumeClaimTemplates contains different number of volumes to the old one") } - for i := 0; i < len(oldSts.Spec.VolumeClaimTemplates); i++ { + + //Account for the deleted PVC for wal + lenVCT := 0 + if len(oldSts.Spec.VolumeClaimTemplates) < len(newSts.Spec.VolumeClaimTemplates) { + lenVCT = len(oldSts.Spec.VolumeClaimTemplates) + } else { + lenVCT = len(newSts.Spec.VolumeClaimTemplates) + } + + for i := 0; i < lenVCT; i++ { name := oldSts.Spec.VolumeClaimTemplates[i].Name // Some generated fields like creationTimestamp make it not possible to use DeepCompare on ObjectMeta if name != newSts.Spec.VolumeClaimTemplates[i].Name { @@ -1012,8 +1021,17 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error { c.syncMonitoringSecret(oldSpec, newSpec) } + //sync WAL-PVC + if !reflect.DeepEqual(oldSpec.Spec.WalPvc, newSpec.Spec.WalPvc) { + if err := c.syncWalPvc(oldSpec, newSpec); err != nil { + c.logger.Warningf("could not sync PVC WAL %v", err) + } + } + //sync sts when there is a change in the pgbackrest secret, since we need to mount this - if !reflect.DeepEqual(oldSpec.Spec.Backup.Pgbackrest.Configuration, newSpec.Spec.Backup.Pgbackrest.Configuration) { + if oldSpec.Spec.Backup != nil && newSpec.Spec.Backup != nil && + oldSpec.Spec.Backup.Pgbackrest != nil && newSpec.Spec.Backup.Pgbackrest != nil && + !reflect.DeepEqual(oldSpec.Spec.Backup.Pgbackrest.Configuration, newSpec.Spec.Backup.Pgbackrest.Configuration) { syncStatefulSet = true } @@ -1068,6 +1086,14 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error { updateFailed = true return } + if oldSpec.Spec.WalPvc != nil { + //if pvc wal is removed then carry the relevant env vars to the new sts + c.Spec.WalPvc = &cpov1.PVCVolume{ + OldWalDir: oldSpec.Spec.WalPvc.WalDir, + WalVolume: cpov1.Volume{}, + WalDir: "", + } + } if c.restoreInProgress() { c.applyRestoreStatefulSetSyncOverrides(newSs, oldSs) diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 22e80c1f..8e2f175c 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1013,10 +1013,14 @@ func (c *Cluster) generateSpiloPodEnvVars( envVars = append(envVars, v1.EnvVar{Name: "cpo_monitoring_stack", Value: "true"}) } if spec.WalPvc != nil { - envVars = append(envVars, v1.EnvVar{Name: "WALDIR", Value: spec.WalPvc.WalDir}) - envVars = append(envVars, v1.EnvVar{Name: "OLD_WALDIR", Value: ""}) + if spec.WalPvc.WalDir != "" { + envVars = append(envVars, v1.EnvVar{Name: "WALDIR", Value: spec.WalPvc.WalDir}) + envVars = append(envVars, v1.EnvVar{Name: "OLD_WALDIR", Value: ""}) + } else if spec.WalPvc.OldWalDir != "" { + envVars = append(envVars, v1.EnvVar{Name: "WALDIR", Value: ""}) + envVars = append(envVars, v1.EnvVar{Name: "OLD_WALDIR", Value: spec.WalPvc.OldWalDir}) + } } - if c.OpConfig.EnablePgVersionEnvVar { envVars = append(envVars, v1.EnvVar{Name: "PGVERSION", Value: c.GetDesiredMajorVersion()}) } @@ -1377,7 +1381,7 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu volumeMounts := generateVolumeMounts(spec.Volume) - if spec.WalPvc != nil { + if spec.WalPvc != nil && spec.WalPvc.WalDir != "" { volumeMounts = append(volumeMounts, generateWalVolumeMounts(spec.WalPvc.WalVolume, spec.WalPvc.WalDir)) } @@ -1525,7 +1529,7 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu additionalVolumes = append(additionalVolumes, c.generateCertSecretVolume()) } } - if spec.WalPvc != nil { + if spec.WalPvc != nil && spec.WalPvc.WalDir != "" { WalPvcClaim, err = c.generatePersistentVolumeClaimTemplate(spec.WalPvc.WalVolume.Size, spec.WalPvc.WalVolume.StorageClass, spec.WalPvc.WalVolume.Selector, spec.WalPvc.WalDir) if err != nil { @@ -1601,7 +1605,7 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu } final_vols := []v1.PersistentVolumeClaim{*volumeClaimTemplate} - if spec.WalPvc != nil { + if spec.WalPvc != nil && spec.WalPvc.WalDir != "" { final_vols = []v1.PersistentVolumeClaim{*volumeClaimTemplate, *WalPvcClaim} } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 48f3a6f3..c2e0ad97 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -218,7 +218,7 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error { return err } - // sync volume may already transition volumes to gp3, if iops/throughput or type is specified + //sync volume may already transition volumes to gp3, if iops/throughput or type is specified if err = c.syncVolumes(); err != nil { return err } @@ -230,11 +230,6 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error { } } - // sync WAL PVC - if !reflect.DeepEqual(oldSpec.Spec.WalPvc, newSpec.Spec.WalPvc) { - c.syncWalPvc(&oldSpec, newSpec) - } - c.logger.Debug("syncing statefulsets") if err = c.syncStatefulSet(); err != nil { if !k8sutil.ResourceAlreadyExists(err) { @@ -292,6 +287,10 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error { return fmt.Errorf("could not sync monitoring: %v", err) } + if err = c.syncWalPvc(&oldSpec, newSpec); err != nil { + return fmt.Errorf("could not sync WAL-PVC: %v", err) + } + if len(c.Spec.Streams) > 0 { c.logger.Debug("syncing streams") if err = c.syncStreams(); err != nil { @@ -1761,35 +1760,43 @@ func (c *Cluster) syncMonitoringSecret(oldSpec, newSpec *cpov1.Postgresql) error return nil } -func (c *Cluster) syncWalPvc(oldSpec, newSpec *cpov1.Postgresql) { - if oldSpec.Spec.WalPvc != nil && newSpec.Spec.WalPvc == nil { +func (c *Cluster) syncWalPvc(oldSpec, newSpec *cpov1.Postgresql) error { + c.logger.Info("syncing PVC for WAL with Spec") + c.setProcessName("syncing PVC for WAL") + + if newSpec.Spec.WalPvc == nil && oldSpec.Spec.WalPvc != nil { // if the wal_pvc is removed, then // 1. Change env-vars // 2. Remove the PVC + // 3. Remember the old dir name pvcs, err := c.listPersistentVolumeClaims() if err != nil { - c.logger.Error("Could not list PVCs") + return fmt.Errorf("Could not list PVCs") } else { for _, pvc := range pvcs { + c.logger.Infof("Current PVC name is %v", pvc.Name) if strings.Contains(pvc.Name, oldSpec.Spec.WalPvc.WalDir) { - c.logger.Debugf("deleting WAL-PVC %q", util.NameFromMeta(pvc.ObjectMeta)) + c.logger.Infof("deleting WAL-PVC %q", util.NameFromMeta(pvc.ObjectMeta)) if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, c.deleteOptions); err != nil { - c.logger.Warningf("could not delete WAL PVC: %v", err) + return fmt.Errorf("could not delete WAL PVC: %v", err) } } } containers := c.Statefulset.Spec.Template.Spec.Containers for _, con := range containers { + c.logger.Infof("changing env-vars for wal pvc %v", con) con.Env = append(con.Env, v1.EnvVar{Name: "WALDIR", Value: ""}) con.Env = append(con.Env, v1.EnvVar{Name: "OLD_WALDIR", Value: oldSpec.Spec.WalPvc.WalDir}) + c.logger.Infof("changed env-vars for wal pvc %v", con.Env) } } - - } else if oldSpec.Spec.WalPvc == nil && newSpec.Spec.WalPvc != nil { - // if the wal_pvc is added, then - // 1. Create the PVC - // 2. Change env-vars + c.Spec.WalPvc = &cpov1.PVCVolume{ + OldWalDir: oldSpec.Spec.WalPvc.WalDir, + WalVolume: cpov1.Volume{}, + WalDir: "", + } } + return nil } func generateRootCertificate( From 4bc1d9c775120d9ce2e16bd20da309bdf0a55446 Mon Sep 17 00:00:00 2001 From: rafia sabih Date: Tue, 6 Aug 2024 12:32:29 +0200 Subject: [PATCH 07/11] Simplify the API User provides only the volume specifics for the PVC. The name of the volume is pre-configured as walpvc-cluster-name. The volume is mounted at constants.PostgresPVCWalMount. --- .../cpo.opensource.cybertec.at/v1/crds.go | 114 ++++++++---------- .../v1/postgresql_type.go | 8 +- .../v1/zz_generated.deepcopy.go | 19 +-- pkg/cluster/cluster.go | 6 +- pkg/cluster/k8sres.go | 29 ++--- pkg/cluster/sync.go | 16 +-- pkg/util/constants/postgresql.go | 3 +- 7 files changed, 75 insertions(+), 120 deletions(-) diff --git a/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go b/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go index 4d2617b5..283ea4d3 100644 --- a/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go +++ b/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go @@ -1349,84 +1349,72 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ }, "walPvc": { Type: "object", - Nullable: true, + Required: []string{"size"}, Properties: map[string]apiextv1.JSONSchemaProps{ - "waldir": { - Type: "string", - }, - "oldwaldir": { - Type: "string", + "iops": { + Type: "integer", }, - "walvolume": { - Type: "object", - Required: []string{"size"}, + "selector": { + Type: "object", Properties: map[string]apiextv1.JSONSchemaProps{ - "iops": { - Type: "integer", - }, - "selector": { - Type: "object", - Properties: map[string]apiextv1.JSONSchemaProps{ - "matchExpressions": { - Type: "array", - Items: &apiextv1.JSONSchemaPropsOrArray{ - Schema: &apiextv1.JSONSchemaProps{ - Type: "object", - Required: []string{"key", "operator"}, - Properties: map[string]apiextv1.JSONSchemaProps{ - "key": { - Type: "string", + "matchExpressions": { + Type: "array", + Items: &apiextv1.JSONSchemaPropsOrArray{ + Schema: &apiextv1.JSONSchemaProps{ + Type: "object", + Required: []string{"key", "operator"}, + Properties: map[string]apiextv1.JSONSchemaProps{ + "key": { + Type: "string", + }, + "operator": { + Type: "string", + Enum: []apiextv1.JSON{ + { + Raw: []byte(`"DoesNotExist"`), }, - "operator": { - Type: "string", - Enum: []apiextv1.JSON{ - { - Raw: []byte(`"DoesNotExist"`), - }, - { - Raw: []byte(`"Exists"`), - }, - { - Raw: []byte(`"In"`), - }, - { - Raw: []byte(`"NotIn"`), - }, - }, + { + Raw: []byte(`"Exists"`), }, - "values": { - Type: "array", - Items: &apiextv1.JSONSchemaPropsOrArray{ - Schema: &apiextv1.JSONSchemaProps{ - Type: "string", - }, - }, + { + Raw: []byte(`"In"`), + }, + { + Raw: []byte(`"NotIn"`), + }, + }, + }, + "values": { + Type: "array", + Items: &apiextv1.JSONSchemaPropsOrArray{ + Schema: &apiextv1.JSONSchemaProps{ + Type: "string", }, }, }, }, }, - "matchLabels": { - Type: "object", - XPreserveUnknownFields: util.True(), - }, }, }, - "size": { - Type: "string", - Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", - }, - "storageClass": { - Type: "string", - }, - "subPath": { - Type: "string", - }, - "throughput": { - Type: "integer", + "matchLabels": { + Type: "object", + XPreserveUnknownFields: util.True(), }, }, }, + "size": { + Type: "string", + Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", + }, + "storageClass": { + Type: "string", + }, + "subPath": { + Type: "string", + }, + "throughput": { + Type: "integer", + }, }, }, }, diff --git a/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go b/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go index 2915c3f3..8828c3a4 100644 --- a/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go +++ b/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go @@ -96,13 +96,7 @@ type PostgresSpec struct { Backup *Backup `json:"backup,omitempty"` TDE *TDE `json:"tde,omitempty"` Monitoring *Monitoring `json:"monitor,omitempty"` - WalPvc *PVCVolume `json:"walPvc,omitempty"` -} - -type PVCVolume struct { - WalVolume Volume `json:"walvolume,omitempty"` - WalDir string `json:"waldir,omitempty"` - OldWalDir string `json:"oldwaldir,omitempty"` + WalPvc *Volume `json:"walPvc,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/cpo.opensource.cybertec.at/v1/zz_generated.deepcopy.go b/pkg/apis/cpo.opensource.cybertec.at/v1/zz_generated.deepcopy.go index 2891a964..b87578ef 100644 --- a/pkg/apis/cpo.opensource.cybertec.at/v1/zz_generated.deepcopy.go +++ b/pkg/apis/cpo.opensource.cybertec.at/v1/zz_generated.deepcopy.go @@ -583,23 +583,6 @@ func (in *OperatorTimeouts) DeepCopy() *OperatorTimeouts { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *PVCVolume) DeepCopyInto(out *PVCVolume) { - *out = *in - in.WalVolume.DeepCopyInto(&out.WalVolume) - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PVCVolume. -func (in *PVCVolume) DeepCopy() *PVCVolume { - if in == nil { - return nil - } - out := new(PVCVolume) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Patroni) DeepCopyInto(out *Patroni) { *out = *in @@ -984,7 +967,7 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) { } if in.WalPvc != nil { in, out := &in.WalPvc, &out.WalPvc - *out = new(PVCVolume) + *out = new(Volume) (*in).DeepCopyInto(*out) } return diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 7eecb3e6..533442f3 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1088,11 +1088,7 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error { } if oldSpec.Spec.WalPvc != nil { //if pvc wal is removed then carry the relevant env vars to the new sts - c.Spec.WalPvc = &cpov1.PVCVolume{ - OldWalDir: oldSpec.Spec.WalPvc.WalDir, - WalVolume: cpov1.Volume{}, - WalDir: "", - } + } if c.restoreInProgress() { diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 8e2f175c..065e9bc2 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -649,6 +649,10 @@ func isBootstrapOnlyParameter(param string) bool { return result } +func getWALPVCName(cluster_name string) string { + return "walpvc" + cluster_name +} + func generateVolumeMounts(volume cpov1.Volume) []v1.VolumeMount { return []v1.VolumeMount{ { @@ -659,10 +663,10 @@ func generateVolumeMounts(volume cpov1.Volume) []v1.VolumeMount { } } -func generateWalVolumeMounts(volume cpov1.Volume, dir string) v1.VolumeMount { +func generateWalVolumeMounts(volume cpov1.Volume, cluster_name string) v1.VolumeMount { return v1.VolumeMount{ - Name: dir, - MountPath: constants.PostgresWalMount, //TODO: fetch from manifest + Name: getWALPVCName(cluster_name), + MountPath: constants.PostgresPVCWalMount, SubPath: volume.SubPath, } } @@ -1013,12 +1017,9 @@ func (c *Cluster) generateSpiloPodEnvVars( envVars = append(envVars, v1.EnvVar{Name: "cpo_monitoring_stack", Value: "true"}) } if spec.WalPvc != nil { - if spec.WalPvc.WalDir != "" { - envVars = append(envVars, v1.EnvVar{Name: "WALDIR", Value: spec.WalPvc.WalDir}) + if spec.WalPvc != nil { + envVars = append(envVars, v1.EnvVar{Name: "WALDIR", Value: constants.PostgresPVCWalMount}) envVars = append(envVars, v1.EnvVar{Name: "OLD_WALDIR", Value: ""}) - } else if spec.WalPvc.OldWalDir != "" { - envVars = append(envVars, v1.EnvVar{Name: "WALDIR", Value: ""}) - envVars = append(envVars, v1.EnvVar{Name: "OLD_WALDIR", Value: spec.WalPvc.OldWalDir}) } } if c.OpConfig.EnablePgVersionEnvVar { @@ -1381,8 +1382,8 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu volumeMounts := generateVolumeMounts(spec.Volume) - if spec.WalPvc != nil && spec.WalPvc.WalDir != "" { - volumeMounts = append(volumeMounts, generateWalVolumeMounts(spec.WalPvc.WalVolume, spec.WalPvc.WalDir)) + if spec.WalPvc != nil { + volumeMounts = append(volumeMounts, generateWalVolumeMounts(*spec.WalPvc, c.Spec.ClusterName)) } // configure TLS with a custom secret volume @@ -1529,9 +1530,9 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu additionalVolumes = append(additionalVolumes, c.generateCertSecretVolume()) } } - if spec.WalPvc != nil && spec.WalPvc.WalDir != "" { - WalPvcClaim, err = c.generatePersistentVolumeClaimTemplate(spec.WalPvc.WalVolume.Size, - spec.WalPvc.WalVolume.StorageClass, spec.WalPvc.WalVolume.Selector, spec.WalPvc.WalDir) + if spec.WalPvc != nil { + WalPvcClaim, err = c.generatePersistentVolumeClaimTemplate(spec.WalPvc.Size, + spec.WalPvc.StorageClass, spec.WalPvc.Selector, getWALPVCName(spec.ClusterName)) if err != nil { c.logger.Errorf("could not generate volume claim template for WAL directory: %v", err) } @@ -1605,7 +1606,7 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu } final_vols := []v1.PersistentVolumeClaim{*volumeClaimTemplate} - if spec.WalPvc != nil && spec.WalPvc.WalDir != "" { + if spec.WalPvc != nil { final_vols = []v1.PersistentVolumeClaim{*volumeClaimTemplate, *WalPvcClaim} } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index c2e0ad97..cc83c9fc 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -1761,7 +1761,7 @@ func (c *Cluster) syncMonitoringSecret(oldSpec, newSpec *cpov1.Postgresql) error } func (c *Cluster) syncWalPvc(oldSpec, newSpec *cpov1.Postgresql) error { - c.logger.Info("syncing PVC for WAL with Spec") + c.logger.Info("syncing PVC for WAL") c.setProcessName("syncing PVC for WAL") if newSpec.Spec.WalPvc == nil && oldSpec.Spec.WalPvc != nil { @@ -1774,8 +1774,7 @@ func (c *Cluster) syncWalPvc(oldSpec, newSpec *cpov1.Postgresql) error { return fmt.Errorf("Could not list PVCs") } else { for _, pvc := range pvcs { - c.logger.Infof("Current PVC name is %v", pvc.Name) - if strings.Contains(pvc.Name, oldSpec.Spec.WalPvc.WalDir) { + if strings.Contains(pvc.Name, getWALPVCName(c.Spec.ClusterName)) { c.logger.Infof("deleting WAL-PVC %q", util.NameFromMeta(pvc.ObjectMeta)) if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, c.deleteOptions); err != nil { return fmt.Errorf("could not delete WAL PVC: %v", err) @@ -1784,17 +1783,10 @@ func (c *Cluster) syncWalPvc(oldSpec, newSpec *cpov1.Postgresql) error { } containers := c.Statefulset.Spec.Template.Spec.Containers for _, con := range containers { - c.logger.Infof("changing env-vars for wal pvc %v", con) - con.Env = append(con.Env, v1.EnvVar{Name: "WALDIR", Value: ""}) - con.Env = append(con.Env, v1.EnvVar{Name: "OLD_WALDIR", Value: oldSpec.Spec.WalPvc.WalDir}) - c.logger.Infof("changed env-vars for wal pvc %v", con.Env) + con.Env = append(con.Env, v1.EnvVar{Name: "WALDIR", Value: constants.PostgresWalMount}) + con.Env = append(con.Env, v1.EnvVar{Name: "OLD_WALDIR", Value: constants.PostgresPVCWalMount}) } } - c.Spec.WalPvc = &cpov1.PVCVolume{ - OldWalDir: oldSpec.Spec.WalPvc.WalDir, - WalVolume: cpov1.Volume{}, - WalDir: "", - } } return nil } diff --git a/pkg/util/constants/postgresql.go b/pkg/util/constants/postgresql.go index d32a7d66..cdb19393 100644 --- a/pkg/util/constants/postgresql.go +++ b/pkg/util/constants/postgresql.go @@ -19,5 +19,6 @@ const ( RunVolumeName = "postgresql-run" RunVolumePath = "/var/run/postgresql" - PostgresWalMount = "/home/postgres/pgwal" + PostgresWalMount = "/home/postgres/pgwal" + PostgresPVCWalMount = "/home/postgres/pvc/pgwal" ) From aef3851ac0d6a84ea41c357c3151eca69440b2b0 Mon Sep 17 00:00:00 2001 From: rafia sabih Date: Wed, 7 Aug 2024 18:58:04 +0200 Subject: [PATCH 08/11] Use Patroni SetPostgresParameters() to change log_directory --- pkg/cluster/cluster.go | 14 ++++++++++++++ pkg/cluster/sync.go | 16 ++++++++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 533442f3..cfe4b347 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -447,6 +447,20 @@ func (c *Cluster) Create() (err error) { } } + if c.Spec.WalPvc != nil { + log_dir := map[string]string{ + "log_directory": constants.PostgresPVCWalMount, + } + pods, _ := c.listPodsOfType(TYPE_POSTGRESQL) + for _, p := range pods { + err := c.patroni.SetPostgresParameters(&p, log_dir) + if err != nil { + return fmt.Errorf("log_directory with pvc could not be set: %v", err) + } + } + + } + // remember slots to detect deletion from manifest for slotName, desiredSlot := range c.Spec.Patroni.Slots { c.replicationSlots[slotName] = desiredSlot diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index cc83c9fc..44c5d49a 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -1766,9 +1766,21 @@ func (c *Cluster) syncWalPvc(oldSpec, newSpec *cpov1.Postgresql) error { if newSpec.Spec.WalPvc == nil && oldSpec.Spec.WalPvc != nil { // if the wal_pvc is removed, then - // 1. Change env-vars + // 1. Change log_directory // 2. Remove the PVC - // 3. Remember the old dir name + // 3. Change env vars + + log_dir := map[string]string{ + "log_directory": constants.PostgresWalMount, + } + pods, _ := c.listPodsOfType(TYPE_POSTGRESQL) + for _, p := range pods { + err := c.patroni.SetPostgresParameters(&p, log_dir) + if err != nil { + return fmt.Errorf("log_directory with pvc could not be set: %v", err) + } + } + pvcs, err := c.listPersistentVolumeClaims() if err != nil { return fmt.Errorf("Could not list PVCs") From 7d09dcb2f76413b23fe1fb9f9e591a60511e974f Mon Sep 17 00:00:00 2001 From: Rafia Sabih Date: Thu, 15 Aug 2024 10:23:22 +0000 Subject: [PATCH 09/11] Add Env vars for the wal Pvc When using separate PVc for wal, USE_WAL_PVC is set otherwise False. Use the path in PVCWALDIR for moving wal only when USE_WAL_PVC is set. --- pkg/cluster/cluster.go | 15 +-------------- pkg/cluster/k8sres.go | 9 +++++---- pkg/cluster/sync.go | 21 --------------------- pkg/util/constants/postgresql.go | 3 +-- 4 files changed, 7 insertions(+), 41 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index cfe4b347..1cbd738c 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -447,20 +447,6 @@ func (c *Cluster) Create() (err error) { } } - if c.Spec.WalPvc != nil { - log_dir := map[string]string{ - "log_directory": constants.PostgresPVCWalMount, - } - pods, _ := c.listPodsOfType(TYPE_POSTGRESQL) - for _, p := range pods { - err := c.patroni.SetPostgresParameters(&p, log_dir) - if err != nil { - return fmt.Errorf("log_directory with pvc could not be set: %v", err) - } - } - - } - // remember slots to detect deletion from manifest for slotName, desiredSlot := range c.Spec.Patroni.Slots { c.replicationSlots[slotName] = desiredSlot @@ -1037,6 +1023,7 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error { //sync WAL-PVC if !reflect.DeepEqual(oldSpec.Spec.WalPvc, newSpec.Spec.WalPvc) { + c.logger.Info("####### GOING to Update the pods") if err := c.syncWalPvc(oldSpec, newSpec); err != nil { c.logger.Warningf("could not sync PVC WAL %v", err) } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 065e9bc2..5a152601 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1016,11 +1016,12 @@ func (c *Cluster) generateSpiloPodEnvVars( if spec.Monitoring != nil { envVars = append(envVars, v1.EnvVar{Name: "cpo_monitoring_stack", Value: "true"}) } + envVars = append(envVars, v1.EnvVar{Name: "PVCWALDIR", Value: constants.PostgresPVCWalMount}) + if spec.WalPvc != nil { - if spec.WalPvc != nil { - envVars = append(envVars, v1.EnvVar{Name: "WALDIR", Value: constants.PostgresPVCWalMount}) - envVars = append(envVars, v1.EnvVar{Name: "OLD_WALDIR", Value: ""}) - } + envVars = append(envVars, v1.EnvVar{Name: "USE_PVC_WAL", Value: "true"}) + }else{ + envVars = append(envVars, v1.EnvVar{Name: "USE_PVC_WAL", Value: "false"}) } if c.OpConfig.EnablePgVersionEnvVar { envVars = append(envVars, v1.EnvVar{Name: "PGVERSION", Value: c.GetDesiredMajorVersion()}) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 44c5d49a..0f92bb29 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -1765,22 +1765,6 @@ func (c *Cluster) syncWalPvc(oldSpec, newSpec *cpov1.Postgresql) error { c.setProcessName("syncing PVC for WAL") if newSpec.Spec.WalPvc == nil && oldSpec.Spec.WalPvc != nil { - // if the wal_pvc is removed, then - // 1. Change log_directory - // 2. Remove the PVC - // 3. Change env vars - - log_dir := map[string]string{ - "log_directory": constants.PostgresWalMount, - } - pods, _ := c.listPodsOfType(TYPE_POSTGRESQL) - for _, p := range pods { - err := c.patroni.SetPostgresParameters(&p, log_dir) - if err != nil { - return fmt.Errorf("log_directory with pvc could not be set: %v", err) - } - } - pvcs, err := c.listPersistentVolumeClaims() if err != nil { return fmt.Errorf("Could not list PVCs") @@ -1793,11 +1777,6 @@ func (c *Cluster) syncWalPvc(oldSpec, newSpec *cpov1.Postgresql) error { } } } - containers := c.Statefulset.Spec.Template.Spec.Containers - for _, con := range containers { - con.Env = append(con.Env, v1.EnvVar{Name: "WALDIR", Value: constants.PostgresWalMount}) - con.Env = append(con.Env, v1.EnvVar{Name: "OLD_WALDIR", Value: constants.PostgresPVCWalMount}) - } } } return nil diff --git a/pkg/util/constants/postgresql.go b/pkg/util/constants/postgresql.go index cdb19393..0c7c2561 100644 --- a/pkg/util/constants/postgresql.go +++ b/pkg/util/constants/postgresql.go @@ -19,6 +19,5 @@ const ( RunVolumeName = "postgresql-run" RunVolumePath = "/var/run/postgresql" - PostgresWalMount = "/home/postgres/pgwal" - PostgresPVCWalMount = "/home/postgres/pvc/pgwal" + PostgresPVCWalMount = "/home/postgres/pvc/" ) From cd2508921590e0c52852992284425a9b3e2f4875 Mon Sep 17 00:00:00 2001 From: Rafia Sabih Date: Fri, 16 Aug 2024 07:31:59 +0000 Subject: [PATCH 10/11] Simplify env-vars --- pkg/cluster/k8sres.go | 6 ++---- pkg/cluster/sync.go | 3 +++ pkg/generated/clientset/versioned/fake/register.go | 14 +++++++------- .../clientset/versioned/scheme/register.go | 14 +++++++------- pkg/util/constants/postgresql.go | 1 + 5 files changed, 20 insertions(+), 18 deletions(-) diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 5a152601..a31766ce 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1016,12 +1016,10 @@ func (c *Cluster) generateSpiloPodEnvVars( if spec.Monitoring != nil { envVars = append(envVars, v1.EnvVar{Name: "cpo_monitoring_stack", Value: "true"}) } - envVars = append(envVars, v1.EnvVar{Name: "PVCWALDIR", Value: constants.PostgresPVCWalMount}) if spec.WalPvc != nil { - envVars = append(envVars, v1.EnvVar{Name: "USE_PVC_WAL", Value: "true"}) - }else{ - envVars = append(envVars, v1.EnvVar{Name: "USE_PVC_WAL", Value: "false"}) + envVars = append(envVars, v1.EnvVar{Name: "WALDIR", Value: constants.PostgresPVCWalMount}) + envVars = append(envVars, v1.EnvVar{Name: "OLDWALDIR", Value: constants.PostgresWALPath}) } if c.OpConfig.EnablePgVersionEnvVar { envVars = append(envVars, v1.EnvVar{Name: "PGVERSION", Value: c.GetDesiredMajorVersion()}) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 0f92bb29..58859f90 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -1765,6 +1765,9 @@ func (c *Cluster) syncWalPvc(oldSpec, newSpec *cpov1.Postgresql) error { c.setProcessName("syncing PVC for WAL") if newSpec.Spec.WalPvc == nil && oldSpec.Spec.WalPvc != nil { + // run the script to move the wal files and then remove the pvc + //result, err = c.ExecCommand(podName, "scripts/move_wal_dir.sh" + constants.PostgresPVCWalMount + " " + constants.PostgresWALPath) + pvcs, err := c.listPersistentVolumeClaims() if err != nil { return fmt.Errorf("Could not list PVCs") diff --git a/pkg/generated/clientset/versioned/fake/register.go b/pkg/generated/clientset/versioned/fake/register.go index 9ac0f665..061b3064 100644 --- a/pkg/generated/clientset/versioned/fake/register.go +++ b/pkg/generated/clientset/versioned/fake/register.go @@ -45,14 +45,14 @@ var localSchemeBuilder = runtime.SchemeBuilder{ // AddToScheme adds all types of this clientset into the given scheme. This allows composition // of clientsets, like in: // -// import ( -// "k8s.io/client-go/kubernetes" -// clientsetscheme "k8s.io/client-go/kubernetes/scheme" -// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" -// ) +// import ( +// "k8s.io/client-go/kubernetes" +// clientsetscheme "k8s.io/client-go/kubernetes/scheme" +// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" +// ) // -// kclientset, _ := kubernetes.NewForConfig(c) -// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) +// kclientset, _ := kubernetes.NewForConfig(c) +// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) // // After this, RawExtensions in Kubernetes types will serialize kube-aggregator types // correctly. diff --git a/pkg/generated/clientset/versioned/scheme/register.go b/pkg/generated/clientset/versioned/scheme/register.go index 99ebde68..9ead92b8 100644 --- a/pkg/generated/clientset/versioned/scheme/register.go +++ b/pkg/generated/clientset/versioned/scheme/register.go @@ -45,14 +45,14 @@ var localSchemeBuilder = runtime.SchemeBuilder{ // AddToScheme adds all types of this clientset into the given scheme. This allows composition // of clientsets, like in: // -// import ( -// "k8s.io/client-go/kubernetes" -// clientsetscheme "k8s.io/client-go/kubernetes/scheme" -// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" -// ) +// import ( +// "k8s.io/client-go/kubernetes" +// clientsetscheme "k8s.io/client-go/kubernetes/scheme" +// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" +// ) // -// kclientset, _ := kubernetes.NewForConfig(c) -// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) +// kclientset, _ := kubernetes.NewForConfig(c) +// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) // // After this, RawExtensions in Kubernetes types will serialize kube-aggregator types // correctly. diff --git a/pkg/util/constants/postgresql.go b/pkg/util/constants/postgresql.go index 0c7c2561..4a1069f0 100644 --- a/pkg/util/constants/postgresql.go +++ b/pkg/util/constants/postgresql.go @@ -20,4 +20,5 @@ const ( RunVolumePath = "/var/run/postgresql" PostgresPVCWalMount = "/home/postgres/pvc/" + PostgresWALPath = PostgresDataPath + "/pg_wal" ) From 6abdf9ac7edfeab83b2de266aa59749034aedc41 Mon Sep 17 00:00:00 2001 From: Rafia Sabih Date: Fri, 16 Aug 2024 08:36:37 +0000 Subject: [PATCH 11/11] Placeholder for script --- pkg/cluster/cluster.go | 1 - pkg/cluster/k8sres.go | 4 ++-- pkg/cluster/sync.go | 14 ++++++++++---- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 1cbd738c..533442f3 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1023,7 +1023,6 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error { //sync WAL-PVC if !reflect.DeepEqual(oldSpec.Spec.WalPvc, newSpec.Spec.WalPvc) { - c.logger.Info("####### GOING to Update the pods") if err := c.syncWalPvc(oldSpec, newSpec); err != nil { c.logger.Warningf("could not sync PVC WAL %v", err) } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index a31766ce..1ff5af7c 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -1018,8 +1018,8 @@ func (c *Cluster) generateSpiloPodEnvVars( } if spec.WalPvc != nil { - envVars = append(envVars, v1.EnvVar{Name: "WALDIR", Value: constants.PostgresPVCWalMount}) - envVars = append(envVars, v1.EnvVar{Name: "OLDWALDIR", Value: constants.PostgresWALPath}) + envVars = append(envVars, v1.EnvVar{Name: "NEWWALDIR", Value: constants.PostgresPVCWalMount}) + envVars = append(envVars, v1.EnvVar{Name: "OLDWALDIR",Value: ""}) } if c.OpConfig.EnablePgVersionEnvVar { envVars = append(envVars, v1.EnvVar{Name: "PGVERSION", Value: c.GetDesiredMajorVersion()}) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 58859f90..52207296 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -218,6 +218,10 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error { return err } + if err = c.syncWalPvc(&oldSpec, newSpec); err != nil { + return fmt.Errorf("could not sync WAL-PVC: %v", err) + } + //sync volume may already transition volumes to gp3, if iops/throughput or type is specified if err = c.syncVolumes(); err != nil { return err @@ -287,10 +291,6 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error { return fmt.Errorf("could not sync monitoring: %v", err) } - if err = c.syncWalPvc(&oldSpec, newSpec); err != nil { - return fmt.Errorf("could not sync WAL-PVC: %v", err) - } - if len(c.Spec.Streams) > 0 { c.logger.Debug("syncing streams") if err = c.syncStreams(); err != nil { @@ -1765,6 +1765,12 @@ func (c *Cluster) syncWalPvc(oldSpec, newSpec *cpov1.Postgresql) error { c.setProcessName("syncing PVC for WAL") if newSpec.Spec.WalPvc == nil && oldSpec.Spec.WalPvc != nil { + + containers := c.Statefulset.Spec.Template.Spec.Containers + for _, con := range containers { + con.Env = append(con.Env, v1.EnvVar{Name: "NEWWALDIR", Value: ""}) + con.Env = append(con.Env, v1.EnvVar{Name: "OLDWALDIR",Value: constants.PostgresPVCWalMount}) + } // run the script to move the wal files and then remove the pvc //result, err = c.ExecCommand(podName, "scripts/move_wal_dir.sh" + constants.PostgresPVCWalMount + " " + constants.PostgresWALPath)