Skip to content

Commit 7464d3c

Browse files
authored
CLOUDP-161398: Support multiple Private Endpoint per PE Service (#877)
1 parent e0d30da commit 7464d3c

File tree

10 files changed

+281
-107
lines changed

10 files changed

+281
-107
lines changed

.github/actions/cleanup/project/pe.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ func deleteAllPE(ctx context.Context, client mongodbatlas.PrivateEndpointsServic
2222

2323
func deletePrivateEndpointsFromAtlas(ctx context.Context, client mongodbatlas.PrivateEndpointsService, projectID string, listsToRemove []mongodbatlas.PrivateEndpointConnection) error {
2424
for _, peService := range listsToRemove {
25-
if interfaceEndpointID(peService) != "" {
26-
log.Printf("Deleting private endpoint %s", interfaceEndpointID(peService))
27-
if _, err := client.DeleteOnePrivateEndpoint(ctx, projectID, peService.ProviderName, peService.ID, interfaceEndpointID(peService)); err != nil {
25+
if firstInterfaceEndpointID(peService) != "" {
26+
log.Printf("Deleting private endpoint %s", firstInterfaceEndpointID(peService))
27+
if _, err := client.DeleteOnePrivateEndpoint(ctx, projectID, peService.ProviderName, peService.ID, firstInterfaceEndpointID(peService)); err != nil {
2828
return fmt.Errorf("error deleting private endpoint interface: %s", err)
2929
}
3030

@@ -39,8 +39,7 @@ func deletePrivateEndpointsFromAtlas(ctx context.Context, client mongodbatlas.Pr
3939
return nil
4040
}
4141

42-
func interfaceEndpointID(connection mongodbatlas.PrivateEndpointConnection) string {
43-
42+
func firstInterfaceEndpointID(connection mongodbatlas.PrivateEndpointConnection) string {
4443
if len(connection.InterfaceEndpoints) != 0 {
4544
return connection.InterfaceEndpoints[0]
4645
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ require (
9696
go.uber.org/atomic v1.9.0 // indirect
9797
go.uber.org/multierr v1.7.0 // indirect
9898
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
99+
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb
99100
golang.org/x/net v0.2.0 // indirect
100101
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect
101102
golang.org/x/sys v0.2.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
413413
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
414414
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
415415
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
416+
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb h1:PaBZQdo+iSDyHT053FjUCgZQ/9uqVwPOcl7KSWhKn6w=
417+
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
416418
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
417419
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
418420
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=

pkg/controller/atlasdeployment/deployment.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -271,12 +271,10 @@ func (r *AtlasDeploymentReconciler) ensureConnectionSecrets(ctx *workflow.Contex
271271
}
272272

273273
data := connectionsecret.ConnectionData{
274-
DBUserName: dbUser.Spec.Username,
275-
ConnURL: connectionStrings.Standard,
276-
SrvConnURL: connectionStrings.StandardSrv,
277-
Password: password,
278-
PvtConnURL: connectionStrings.Private,
279-
PvtSrvConnURL: connectionStrings.PrivateSrv,
274+
DBUserName: dbUser.Spec.Username,
275+
Password: password,
276+
ConnURL: connectionStrings.Standard,
277+
SrvConnURL: connectionStrings.StandardSrv,
280278
}
281279
connectionsecret.FillPrivateConnStrings(connectionStrings, &data)
282280

pkg/controller/atlasproject/private_endpoint.go

Lines changed: 108 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"errors"
66
"net/http"
77

8+
"golang.org/x/exp/slices"
9+
810
"go.mongodb.org/atlas/mongodbatlas"
911

1012
mdbv1 "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1"
@@ -56,7 +58,7 @@ func ensurePrivateEndpoint(ctx *workflow.Context, projectID string, project *mdb
5658
}
5759
}
5860

59-
interfaceStatus := getStatusForInterfaces(ctx, atlasPEs, projectID)
61+
interfaceStatus := getStatusForInterfaces(ctx, projectID, specPEs, atlasPEs)
6062
ctx.SetConditionFromResult(status.PrivateEndpointReadyType, interfaceStatus)
6163

6264
return interfaceStatus
@@ -72,9 +74,9 @@ func syncPrivateEndpointsWithAtlas(ctx *workflow.Context, projectID string, spec
7274
return result, status.PrivateEndpointServiceReadyType
7375
}
7476

75-
endpointsToCreate := getEndpointsNotInAtlas(specPEs, atlasPEs)
77+
endpointsToCreate, endpointCounts := getEndpointsNotInAtlas(specPEs, atlasPEs)
7678
log.Debugf("Number of Private Endpoints to create: %d", len(endpointsToCreate))
77-
newConnections, err := createPeServiceInAtlas(ctx, projectID, endpointsToCreate)
79+
newConnections, err := createPeServiceInAtlas(ctx, projectID, endpointsToCreate, endpointCounts)
7880
if err != nil {
7981
return terminateWithError(ctx, status.PrivateEndpointServiceReadyType, "Failed to create PE Service in Atlas", err)
8082
}
@@ -109,27 +111,37 @@ func getStatusForServices(ctx *workflow.Context, atlasPEs []atlasPE) workflow.Re
109111
return workflow.OK()
110112
}
111113

112-
func getStatusForInterfaces(ctx *workflow.Context, atlasPEs []atlasPE, projectID string) workflow.Result {
114+
func getStatusForInterfaces(ctx *workflow.Context, projectID string, specPEs []mdbv1.PrivateEndpoint, atlasPEs []atlasPE) workflow.Result {
115+
totalInterfaceCount := 0
116+
113117
for _, atlasPeService := range atlasPEs {
114-
interfaceEndpointID := atlasPeService.InterfaceEndpointID()
115-
if interfaceEndpointID == "" {
116-
return notReadyInterfaceResult
117-
}
118+
interfaceEndpointIDs := atlasPeService.InterfaceEndpointIDs()
119+
totalInterfaceCount += len(interfaceEndpointIDs)
118120

119-
interfaceEndpoint, _, err := ctx.Client.PrivateEndpoints.GetOnePrivateEndpoint(context.Background(), projectID, atlasPeService.ProviderName, atlasPeService.ID, interfaceEndpointID)
120-
if err != nil {
121-
return workflow.Terminate(workflow.Internal, err.Error())
122-
}
121+
for _, interfaceEndpointID := range interfaceEndpointIDs {
122+
if interfaceEndpointID == "" {
123+
return notReadyInterfaceResult
124+
}
123125

124-
interfaceIsAvailable, interfaceFailureMessage := checkIfInterfaceIsAvailable(interfaceEndpoint)
125-
if interfaceFailureMessage != "" {
126-
return workflow.Terminate(workflow.ProjectPEInterfaceIsNotReadyInAtlas, interfaceFailureMessage)
127-
}
128-
if !interfaceIsAvailable {
129-
return notReadyInterfaceResult
126+
interfaceEndpoint, _, err := ctx.Client.PrivateEndpoints.GetOnePrivateEndpoint(context.Background(), projectID, atlasPeService.ProviderName, atlasPeService.ID, interfaceEndpointID)
127+
if err != nil {
128+
return workflow.Terminate(workflow.Internal, err.Error())
129+
}
130+
131+
interfaceIsAvailable, interfaceFailureMessage := checkIfInterfaceIsAvailable(interfaceEndpoint)
132+
if interfaceFailureMessage != "" {
133+
return workflow.Terminate(workflow.ProjectPEInterfaceIsNotReadyInAtlas, interfaceFailureMessage)
134+
}
135+
if !interfaceIsAvailable {
136+
return notReadyInterfaceResult
137+
}
130138
}
131139
}
132140

141+
if len(specPEs) != totalInterfaceCount {
142+
return notReadyInterfaceResult
143+
}
144+
133145
return workflow.OK()
134146
}
135147

@@ -170,20 +182,20 @@ func (a atlasPE) Identifier() interface{} {
170182
return a.ProviderName + status.TransformRegionToID(a.RegionName)
171183
}
172184

173-
func (a atlasPE) InterfaceEndpointID() string {
185+
func (a atlasPE) InterfaceEndpointIDs() []string {
174186
if len(a.InterfaceEndpoints) != 0 {
175-
return a.InterfaceEndpoints[0]
187+
return a.InterfaceEndpoints
176188
}
177189

178190
if len(a.PrivateEndpoints) != 0 {
179-
return a.PrivateEndpoints[0]
191+
return a.PrivateEndpoints
180192
}
181193

182194
if len(a.EndpointGroupNames) != 0 {
183-
return a.EndpointGroupNames[0]
195+
return a.EndpointGroupNames
184196
}
185197

186-
return ""
198+
return nil
187199
}
188200

189201
func getAllPrivateEndpoints(client mongodbatlas.Client, projectID string) (result []atlasPE, err error) {
@@ -206,9 +218,9 @@ func getAllPrivateEndpoints(client mongodbatlas.Client, projectID string) (resul
206218
return
207219
}
208220

209-
func createPeServiceInAtlas(ctx *workflow.Context, projectID string, endpointsToCreate []mdbv1.PrivateEndpoint) (newConnections []atlasPE, err error) {
221+
func createPeServiceInAtlas(ctx *workflow.Context, projectID string, endpointsToCreate []mdbv1.PrivateEndpoint, endpointCounts []int) (newConnections []atlasPE, err error) {
210222
newConnections = make([]atlasPE, 0)
211-
for _, pe := range endpointsToCreate {
223+
for idx, pe := range endpointsToCreate {
212224
conn, _, err := ctx.Client.PrivateEndpoints.Create(context.Background(), projectID, &mongodbatlas.PrivateEndpointConnection{
213225
ProviderName: string(pe.Provider),
214226
Region: pe.Region,
@@ -219,7 +231,11 @@ func createPeServiceInAtlas(ctx *workflow.Context, projectID string, endpointsTo
219231

220232
conn.ProviderName = string(pe.Provider)
221233
conn.Region = pe.Region
222-
newConnections = append(newConnections, atlasPE(*conn))
234+
newConn := atlasPE(*conn)
235+
236+
for i := 0; i < endpointCounts[idx]; i++ {
237+
newConnections = append(newConnections, newConn)
238+
}
223239
}
224240

225241
return newConnections, nil
@@ -265,9 +281,9 @@ func endpointNeedsUpdating(specPeService mdbv1.PrivateEndpoint, atlasPeService a
265281
if isAvailable(atlasPeService.Status) && endpointDefinedInSpec(specPeService) {
266282
switch specPeService.Provider {
267283
case provider.ProviderAWS, provider.ProviderAzure:
268-
return specPeService.ID != atlasPeService.InterfaceEndpointID()
284+
return !slices.Contains(atlasPeService.InterfaceEndpointIDs(), specPeService.ID)
269285
case provider.ProviderGCP:
270-
return specPeService.EndpointGroupName != atlasPeService.InterfaceEndpointID() || len(atlasPeService.ServiceAttachmentNames) != len(specPeService.Endpoints)
286+
return !slices.Contains(atlasPeService.InterfaceEndpointIDs(), specPeService.EndpointGroupName) || len(atlasPeService.ServiceAttachmentNames) != len(specPeService.Endpoints)
271287
}
272288
}
273289

@@ -309,9 +325,12 @@ func deletePrivateEndpointsFromAtlas(ctx *workflow.Context, projectID string, li
309325
continue
310326
}
311327

312-
if peService.InterfaceEndpointID() != "" {
313-
if _, err := ctx.Client.PrivateEndpoints.DeleteOnePrivateEndpoint(context.Background(), projectID, peService.ProviderName, peService.ID, peService.InterfaceEndpointID()); err != nil {
314-
return workflow.Terminate(workflow.ProjectPEInterfaceIsNotReadyInAtlas, "failed to delete Private Endpoint")
328+
interfaceEndpointIDs := peService.InterfaceEndpointIDs()
329+
if len(interfaceEndpointIDs) != 0 {
330+
for _, interfaceEndpointID := range interfaceEndpointIDs {
331+
if _, err := ctx.Client.PrivateEndpoints.DeleteOnePrivateEndpoint(context.Background(), projectID, peService.ProviderName, peService.ID, interfaceEndpointID); err != nil {
332+
return workflow.Terminate(workflow.ProjectPEInterfaceIsNotReadyInAtlas, "failed to delete Private Endpoint")
333+
}
315334
}
316335

317336
continue
@@ -329,44 +348,56 @@ func deletePrivateEndpointsFromAtlas(ctx *workflow.Context, projectID string, li
329348

330349
func convertAllToStatus(ctx *workflow.Context, projectID string, peList []atlasPE) (result []status.ProjectPrivateEndpoint) {
331350
for _, endpoint := range peList {
332-
result = append(result, convertOneToStatus(ctx, projectID, endpoint))
351+
result = append(result, convertOneServiceToStatus(ctx, projectID, endpoint)...)
333352
}
334353

335354
return result
336355
}
337356

338-
func convertOneToStatus(ctx *workflow.Context, projectID string, conn atlasPE) status.ProjectPrivateEndpoint {
357+
func convertOneServiceToStatus(ctx *workflow.Context, projectID string, conn atlasPE) []status.ProjectPrivateEndpoint {
358+
interfaceEndpointIDs := conn.InterfaceEndpointIDs()
359+
360+
if len(interfaceEndpointIDs) == 0 {
361+
return []status.ProjectPrivateEndpoint{
362+
filledPEStatus(ctx, projectID, conn, ""),
363+
}
364+
}
365+
366+
result := make([]status.ProjectPrivateEndpoint, 0)
367+
for _, interfaceEndpointID := range interfaceEndpointIDs {
368+
result = append(result, filledPEStatus(ctx, projectID, conn, interfaceEndpointID))
369+
}
370+
371+
return result
372+
}
373+
374+
func filledPEStatus(ctx *workflow.Context, projectID string, conn atlasPE, InterfaceEndpointID string) status.ProjectPrivateEndpoint {
339375
pe := status.ProjectPrivateEndpoint{
340-
ID: conn.ID,
341-
Provider: provider.ProviderName(conn.ProviderName),
342-
Region: conn.Region,
376+
ID: conn.ID,
377+
Provider: provider.ProviderName(conn.ProviderName),
378+
Region: conn.Region,
379+
InterfaceEndpointID: InterfaceEndpointID,
343380
}
344381

345382
switch pe.Provider {
346383
case provider.ProviderAWS:
347384
pe.ServiceName = conn.EndpointServiceName
348385
pe.ServiceResourceID = conn.ID
349-
if len(conn.InterfaceEndpoints) != 0 {
350-
pe.InterfaceEndpointID = conn.InterfaceEndpoints[0]
351-
}
352386
case provider.ProviderAzure:
353387
pe.ServiceName = conn.PrivateLinkServiceName
354388
pe.ServiceResourceID = conn.PrivateLinkServiceResourceID
355-
if len(conn.PrivateEndpoints) != 0 {
356-
pe.InterfaceEndpointID = conn.PrivateEndpoints[0]
357-
}
358389
case provider.ProviderGCP:
359390
pe.ServiceAttachmentNames = conn.ServiceAttachmentNames
360-
if len(conn.EndpointGroupNames) != 0 {
391+
if InterfaceEndpointID != "" {
361392
var err error
362-
pe.InterfaceEndpointID = conn.EndpointGroupNames[0]
363393
pe.Endpoints, err = getGCPInterfaceEndpoint(ctx, projectID, pe)
364394
if err != nil {
365395
ctx.Log.Warnw("failed to get Interface Endpoint Data for GCP", "err", err, "pe", pe)
366396
}
367397
}
368398
}
369-
ctx.Log.Debugw("Converted Status", "status", pe, "connection", conn)
399+
400+
ctx.Log.Debugw("Converted One Status", "connection", conn, "private endpoint", pe)
370401

371402
return pe
372403
}
@@ -442,21 +473,42 @@ var notReadyServiceResult = workflow.InProgress(workflow.ProjectPEServiceIsNotRe
442473
var notReadyInterfaceResult = workflow.InProgress(workflow.ProjectPEInterfaceIsNotReadyInAtlas, "Interface Private Endpoint is not ready")
443474

444475
func getEndpointsNotInSpec(specPEs []mdbv1.PrivateEndpoint, atlasPEs []atlasPE) []atlasPE {
445-
difference := set.Difference(atlasPEs, specPEs)
446-
result := []atlasPE{}
447-
for _, item := range difference {
448-
result = append(result, item.(atlasPE))
449-
}
450-
return result
476+
uniqueItems, _ := getUniqueDifference(atlasPEs, specPEs)
477+
return uniqueItems
478+
}
479+
480+
func getEndpointsNotInAtlas(specPEs []mdbv1.PrivateEndpoint, atlasPEs []atlasPE) (toCreate []mdbv1.PrivateEndpoint, counts []int) {
481+
return getUniqueDifference(specPEs, atlasPEs)
451482
}
452483

453-
func getEndpointsNotInAtlas(specPEs []mdbv1.PrivateEndpoint, atlasPEs []atlasPE) []mdbv1.PrivateEndpoint {
454-
difference := set.Difference(specPEs, atlasPEs)
455-
result := []mdbv1.PrivateEndpoint{}
484+
func getUniqueDifference[ResultType interface{}, OtherType interface{}](left []ResultType, right []OtherType) (uniques []ResultType, counts []int) {
485+
difference := set.Difference(left, right)
486+
487+
uniqueItems := make(map[string]itemCount)
456488
for _, item := range difference {
457-
result = append(result, item.(mdbv1.PrivateEndpoint))
489+
key := item.Identifier().(string)
490+
if uniqueItem, found := uniqueItems[key]; found {
491+
uniqueItem.Count += 1
492+
uniqueItems[key] = uniqueItem
493+
} else {
494+
uniqueItems[key] = itemCount{
495+
Item: item,
496+
Count: 1,
497+
}
498+
}
458499
}
459-
return result
500+
501+
for _, value := range uniqueItems {
502+
uniques = append(uniques, value.Item.(ResultType))
503+
counts = append(counts, value.Count)
504+
}
505+
506+
return
507+
}
508+
509+
type itemCount struct {
510+
Item interface{}
511+
Count int
460512
}
461513

462514
func getEndpointsIntersection(specPEs []mdbv1.PrivateEndpoint, atlasPEs []atlasPE) []intersectionPair {

0 commit comments

Comments
 (0)