From da6fe5aa2bfb1469ba5327e7e7b67b189990c5c7 Mon Sep 17 00:00:00 2001 From: Sergiusz Urbaniak Date: Thu, 7 Nov 2024 17:10:51 +0100 Subject: [PATCH 1/6] pkg/controller/atlasdatafederation: fix recocncile of private endpoints --- .../datafederation/conversion_endpoints.go | 45 ++++- .../conversion_endpoints_test.go | 3 +- .../datafederation_controller.go | 7 +- .../atlasdatafederation/private_endpoint.go | 109 ++++++++---- .../private_endpoint_test.go | 167 ++++++++++++++++++ test/e2e/datafederation_pe_test.go | 26 +++ 6 files changed, 316 insertions(+), 41 deletions(-) create mode 100644 pkg/controller/atlasdatafederation/private_endpoint_test.go diff --git a/internal/translation/datafederation/conversion_endpoints.go b/internal/translation/datafederation/conversion_endpoints.go index 813bbf8726..0e6c476d1f 100644 --- a/internal/translation/datafederation/conversion_endpoints.go +++ b/internal/translation/datafederation/conversion_endpoints.go @@ -1,13 +1,16 @@ package datafederation import ( + "encoding/json" "fmt" + "reflect" "go.mongodb.org/atlas-sdk/v20231115008/admin" "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/cmp" "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer" akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/customresource" ) type DatafederationPrivateEndpointEntry struct { @@ -15,13 +18,17 @@ type DatafederationPrivateEndpointEntry struct { ProjectID string } -func NewDatafederationPrivateEndpointEntry(pe *akov2.DataFederationPE, projectID string) *DatafederationPrivateEndpointEntry { +func NewDatafederationPrivateEndpointEntry(projectID string, pe *akov2.DataFederationPE) *DatafederationPrivateEndpointEntry { if pe == nil { return nil } return &DatafederationPrivateEndpointEntry{DataFederationPE: pe, ProjectID: projectID} } +func (e *DatafederationPrivateEndpointEntry) EqualsTo(target *DatafederationPrivateEndpointEntry) bool { + return reflect.DeepEqual(e.DataFederationPE, target.DataFederationPE) +} + func endpointsFromAtlas(endpoints []admin.PrivateNetworkEndpointIdEntry, projectID string) ([]*DatafederationPrivateEndpointEntry, error) { result := make([]*DatafederationPrivateEndpointEntry, 0, len(endpoints)) for _, entry := range endpoints { @@ -58,3 +65,39 @@ func endpointToAtlas(ep *DatafederationPrivateEndpointEntry) *admin.PrivateNetwo Type: pointer.MakePtrOrNil(ep.Type), } } + +type DataFederationPrivateEndpoint struct { + AKO, Atlas, LastApplied *DatafederationPrivateEndpointEntry +} + +func MapDatafederationPrivateEndpoints(projectID string, df *akov2.AtlasDataFederation, atlasEndpoints []*DatafederationPrivateEndpointEntry) (map[string]*DataFederationPrivateEndpoint, error) { + var lastApplied akov2.AtlasDataFederation + if ann, ok := df.GetAnnotations()[customresource.AnnotationLastAppliedConfiguration]; ok { + err := json.Unmarshal([]byte(ann), &lastApplied.Spec) + if err != nil { + return nil, fmt.Errorf("error reading data federation from last applied annotation: %w", err) + } + } + + result := make(map[string]*DataFederationPrivateEndpoint) + addEndpoint := func(endpointID string) { + if _, ok := result[endpointID]; !ok { + result[endpointID] = &DataFederationPrivateEndpoint{} + } + } + + for _, pe := range atlasEndpoints { + addEndpoint(pe.EndpointID) + result[pe.EndpointID].Atlas = pe + } + for _, pe := range df.Spec.PrivateEndpoints { + addEndpoint(pe.EndpointID) + result[pe.EndpointID].AKO = NewDatafederationPrivateEndpointEntry(projectID, &pe) + } + for _, pe := range lastApplied.Spec.PrivateEndpoints { + addEndpoint(pe.EndpointID) + result[pe.EndpointID].LastApplied = NewDatafederationPrivateEndpointEntry(projectID, &pe) + } + + return result, nil +} diff --git a/internal/translation/datafederation/conversion_endpoints_test.go b/internal/translation/datafederation/conversion_endpoints_test.go index 5d850b97e9..24693bfe8f 100644 --- a/internal/translation/datafederation/conversion_endpoints_test.go +++ b/internal/translation/datafederation/conversion_endpoints_test.go @@ -1,7 +1,6 @@ package datafederation import ( - "reflect" "testing" "github.com/google/go-cmp/cmp" @@ -21,7 +20,7 @@ func TestRoundtrip_DataFederationPE(t *testing.T) { toAtlasResult := endpointToAtlas(fuzzed) fromAtlasResult := endpointFromAtlas(toAtlasResult, "") - equals := reflect.DeepEqual(fuzzed, fromAtlasResult) + equals := fuzzed.EqualsTo(fromAtlasResult) if !equals { t.Log(cmp.Diff(fuzzed, fromAtlasResult)) } diff --git a/pkg/controller/atlasdatafederation/datafederation_controller.go b/pkg/controller/atlasdatafederation/datafederation_controller.go index 1241932b4b..f8af18b6f7 100644 --- a/pkg/controller/atlasdatafederation/datafederation_controller.go +++ b/pkg/controller/atlasdatafederation/datafederation_controller.go @@ -116,8 +116,8 @@ func (r *AtlasDataFederationReconciler) Reconcile(context context.Context, req c return result.ReconcileResult(), nil } - if result = r.ensurePrivateEndpoints(ctx, project, dataFederation, endpointService); !result.IsOk() { - ctx.SetConditionFromResult(api.DataFederationPEReadyType, result) + if result = r.ensurePrivateEndpoints(ctx, endpointService, project, dataFederation); !result.IsOk() { + ctx.SetConditionFromResult(api.DataFederationReadyType, result) return result.ReconcileResult(), nil } @@ -145,7 +145,7 @@ func (r *AtlasDataFederationReconciler) Reconcile(context context.Context, req c return r.handleDelete(ctx, log, dataFederation, project, dataFederationService).ReconcileResult(), nil } - err = customresource.ApplyLastConfigApplied(context, project, r.Client) + err = customresource.ApplyLastConfigApplied(context, dataFederation, r.Client) if err != nil { result = workflow.Terminate(workflow.Internal, err.Error()) ctx.SetConditionFromResult(api.DataFederationReadyType, result) @@ -154,6 +154,7 @@ func (r *AtlasDataFederationReconciler) Reconcile(context context.Context, req c return result.ReconcileResult(), nil } + ctx.SetConditionTrue(api.DataFederationReadyType) ctx.SetConditionTrue(api.ReadyType) return workflow.OK().ReconcileResult(), nil } diff --git a/pkg/controller/atlasdatafederation/private_endpoint.go b/pkg/controller/atlasdatafederation/private_endpoint.go index d616f2e34c..cadf6f5121 100644 --- a/pkg/controller/atlasdatafederation/private_endpoint.go +++ b/pkg/controller/atlasdatafederation/private_endpoint.go @@ -1,64 +1,103 @@ package atlasdatafederation import ( - "context" + "fmt" "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/datafederation" - - "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/set" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api" akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow" ) -func (r *AtlasDataFederationReconciler) ensurePrivateEndpoints(ctx *workflow.Context, project *akov2.AtlasProject, dataFederation *akov2.AtlasDataFederation, service datafederation.DatafederationPrivateEndpointService) workflow.Result { +func (r *AtlasDataFederationReconciler) ensurePrivateEndpoints(ctx *workflow.Context, service datafederation.DatafederationPrivateEndpointService, project *akov2.AtlasProject, dataFederation *akov2.AtlasDataFederation) workflow.Result { projectID := project.ID() - specPEs := make([]*datafederation.DatafederationPrivateEndpointEntry, 0, len(dataFederation.Spec.PrivateEndpoints)) - for _, pe := range dataFederation.Spec.PrivateEndpoints { - specPEs = append(specPEs, datafederation.NewDatafederationPrivateEndpointEntry(&pe, projectID)) + fromAtlas, err := service.List(ctx.Context, projectID) + if err != nil { + return r.privateEndpointsFailed(ctx, err) } - //NewDatafederationPrivateEndpointEntry - atlasPEs, err := getAllDataFederationPEs(ctx.Context, service, projectID) + m, err := datafederation.MapDatafederationPrivateEndpoints(projectID, dataFederation, fromAtlas) if err != nil { - ctx.Log.Debugw("getAllDataFederationPEs error", "err", err.Error()) + return r.privateEndpointsFailed(ctx, err) + } + + for _, endpoint := range m { + endpointReconciler := &PrivateEndpointReconciler{service, endpoint} + if err := endpointReconciler.Reconcile(ctx); err != nil { + return r.privateEndpointsFailed(ctx, err) + } } - result := syncPrivateEndpointsWithAtlas(ctx, service, projectID, specPEs, atlasPEs) - if !result.IsOk() { - ctx.SetConditionFromResult(api.DataFederationPEReadyType, result) - return result + if len(dataFederation.Spec.PrivateEndpoints) == 0 { + return r.privateEndpointsUnmanage(ctx) } + return r.privateEndpointsIdle(ctx) +} + +func (r *AtlasDataFederationReconciler) privateEndpointsFailed(ctx *workflow.Context, err error) workflow.Result { + ctx.Log.Errorw("getAllDataFederationPEs error", "err", err.Error()) + result := workflow.Terminate(workflow.Internal, err.Error()) + ctx.SetConditionFromResult(api.DataFederationPEReadyType, result) + return result +} + +func (r *AtlasDataFederationReconciler) privateEndpointsIdle(ctx *workflow.Context) workflow.Result { + ctx.SetConditionTrue(api.DataFederationPEReadyType) return workflow.OK() } -func syncPrivateEndpointsWithAtlas(ctx *workflow.Context, service datafederation.DatafederationPrivateEndpointService, projectID string, specPEs, atlasPEs []*datafederation.DatafederationPrivateEndpointEntry) workflow.Result { - endpointsToCreate := set.Difference(specPEs, atlasPEs) - ctx.Log.Debugw("Data Federation PEs to Create", "endpoints", endpointsToCreate) - for _, e := range endpointsToCreate { - endpoint := e.(*datafederation.DatafederationPrivateEndpointEntry) - if err := service.Create(ctx.Context, endpoint); err != nil { - return workflow.Terminate(workflow.Internal, err.Error()) - } +func (r *AtlasDataFederationReconciler) privateEndpointsUnmanage(ctx *workflow.Context) workflow.Result { + ctx.UnsetCondition(api.DataFederationPEReadyType) + return workflow.OK() +} + +type PrivateEndpointReconciler struct { + service datafederation.DatafederationPrivateEndpointService + endpoint *datafederation.DataFederationPrivateEndpoint +} + +func (r *PrivateEndpointReconciler) Reconcile(ctx *workflow.Context) error { + inAKO := r.endpoint.AKO != nil + inAtlas := r.endpoint.Atlas != nil + inLastApplied := r.endpoint.LastApplied != nil + + switch { + case inAKO && !inAtlas: + return r.create(ctx) + case inAKO: + return r.update(ctx) + case inAtlas && inLastApplied: + // only delete private endpoints that used to be tracked in AKO + return r.delete(ctx) } - endpointsToDelete := set.Difference(atlasPEs, specPEs) - ctx.Log.Debugw("Data Federation PEs to Delete", "endpoints", endpointsToDelete) - for _, item := range endpointsToDelete { - endpoint := item.(*datafederation.DatafederationPrivateEndpointEntry) - if err := service.Delete(ctx.Context, endpoint); err != nil { - return workflow.Terminate(workflow.Internal, err.Error()) - } + return nil +} + +func (r *PrivateEndpointReconciler) create(ctx *workflow.Context) error { + if err := r.service.Create(ctx.Context, r.endpoint.AKO); err != nil { + return fmt.Errorf("error creating private endpoint: %w", err) } + return nil +} - return workflow.OK() +func (r *PrivateEndpointReconciler) delete(ctx *workflow.Context) error { + if err := r.service.Delete(ctx.Context, r.endpoint.Atlas); err != nil { + return fmt.Errorf("error deleting private endpoint: %w", err) + } + return nil } -func getAllDataFederationPEs(ctx context.Context, service datafederation.DatafederationPrivateEndpointService, projectID string) (endpoints []*datafederation.DatafederationPrivateEndpointEntry, err error) { - endpoints, err = service.List(ctx, projectID) - if endpoints == nil { - endpoints = make([]*datafederation.DatafederationPrivateEndpointEntry, 0) +func (r *PrivateEndpointReconciler) update(ctx *workflow.Context) error { + if r.endpoint.AKO.EqualsTo(r.endpoint.Atlas) { + return nil + } + if err := r.delete(ctx); err != nil { + return err + } + if err := r.create(ctx); err != nil { + return err } - return + return nil } diff --git a/pkg/controller/atlasdatafederation/private_endpoint_test.go b/pkg/controller/atlasdatafederation/private_endpoint_test.go new file mode 100644 index 0000000000..ae91abc3c3 --- /dev/null +++ b/pkg/controller/atlasdatafederation/private_endpoint_test.go @@ -0,0 +1,167 @@ +package atlasdatafederation + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/mocks/translation" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/datafederation" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api" + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow" +) + +func TestEnsurePrivateEndpoints(t *testing.T) { + for _, tc := range []struct { + name string + project *akov2.AtlasProject + dataFederation *akov2.AtlasDataFederation + service func(mock *translation.DatafederationPrivateEndpointServiceMock) datafederation.DatafederationPrivateEndpointService + + wantOK bool + wantConditions []api.Condition + }{ + { + name: "empty data federation", + service: func(m *translation.DatafederationPrivateEndpointServiceMock) datafederation.DatafederationPrivateEndpointService { + m.EXPECT().List(mock.Anything, mock.Anything).Return([]*datafederation.DatafederationPrivateEndpointEntry{}, nil) + return m + }, + project: &akov2.AtlasProject{}, + dataFederation: &akov2.AtlasDataFederation{}, + wantOK: true, + wantConditions: []api.Condition{}, + }, + { + name: "error in atlas", + service: func(m *translation.DatafederationPrivateEndpointServiceMock) datafederation.DatafederationPrivateEndpointService { + m.EXPECT().List(mock.Anything, mock.Anything).Return(nil, errors.New("atlas error")) + return m + }, + project: &akov2.AtlasProject{}, + dataFederation: &akov2.AtlasDataFederation{}, + wantOK: false, + wantConditions: []api.Condition{ + { + Type: "DataFederationPrivateEndpointsReady", + Status: "False", Reason: "InternalError", Message: "atlas error", + }, + }, + }, + { + name: "create entry in atlas", + service: func(m *translation.DatafederationPrivateEndpointServiceMock) datafederation.DatafederationPrivateEndpointService { + m.EXPECT().List(mock.Anything, mock.Anything).Return(nil, nil) + m.EXPECT().Create(mock.Anything, mock.Anything).Return(nil) + return m + }, + project: &akov2.AtlasProject{}, + dataFederation: &akov2.AtlasDataFederation{ + Spec: akov2.DataFederationSpec{ + PrivateEndpoints: []akov2.DataFederationPE{ + {EndpointID: "123", Provider: "foo", Type: "some"}, + }, + }, + }, + wantOK: true, + wantConditions: []api.Condition{ + {Type: "DataFederationPrivateEndpointsReady", Status: "True"}, + }, + }, + { + name: "delete and update entry in atlas", + service: func(m *translation.DatafederationPrivateEndpointServiceMock) datafederation.DatafederationPrivateEndpointService { + m.EXPECT().List(mock.Anything, mock.Anything).Return([]*datafederation.DatafederationPrivateEndpointEntry{ + {DataFederationPE: &akov2.DataFederationPE{EndpointID: "123", Provider: "foo", Type: "some"}}, + {DataFederationPE: &akov2.DataFederationPE{EndpointID: "456", Provider: "bar", Type: "some"}}, + }, nil) + m.EXPECT().Delete(mock.Anything, &datafederation.DatafederationPrivateEndpointEntry{ + DataFederationPE: &akov2.DataFederationPE{EndpointID: "123", Provider: "foo", Type: "some"}, + }).Return(nil) + m.EXPECT().Create(mock.Anything, &datafederation.DatafederationPrivateEndpointEntry{ + DataFederationPE: &akov2.DataFederationPE{EndpointID: "123", Provider: "CHANGE", Type: "some"}, + }).Return(nil) + return m + }, + project: &akov2.AtlasProject{}, + dataFederation: &akov2.AtlasDataFederation{ + Spec: akov2.DataFederationSpec{ + PrivateEndpoints: []akov2.DataFederationPE{ + {EndpointID: "123", Provider: "CHANGE", Type: "some"}, + }, + }, + }, + wantOK: true, + wantConditions: []api.Condition{ + {Type: "DataFederationPrivateEndpointsReady", Status: "True"}, + }, + }, + { + name: "do not delete untracked entry in atlas", + service: func(m *translation.DatafederationPrivateEndpointServiceMock) datafederation.DatafederationPrivateEndpointService { + m.EXPECT().List(mock.Anything, mock.Anything).Return([]*datafederation.DatafederationPrivateEndpointEntry{ + {DataFederationPE: &akov2.DataFederationPE{EndpointID: "123", Provider: "foo", Type: "some"}}, + }, nil) + return m + }, + project: &akov2.AtlasProject{}, + dataFederation: &akov2.AtlasDataFederation{}, + wantOK: true, + wantConditions: []api.Condition{}, + }, + { + name: "delete tracked entry in atlas", + service: func(m *translation.DatafederationPrivateEndpointServiceMock) datafederation.DatafederationPrivateEndpointService { + m.EXPECT().List(mock.Anything, mock.Anything).Return([]*datafederation.DatafederationPrivateEndpointEntry{ + {DataFederationPE: &akov2.DataFederationPE{EndpointID: "123", Provider: "foo", Type: "some"}}, + }, nil) + m.EXPECT().Delete(mock.Anything, &datafederation.DatafederationPrivateEndpointEntry{ + DataFederationPE: &akov2.DataFederationPE{EndpointID: "123", Provider: "foo", Type: "some"}, + }).Return(nil) + return m + }, + project: &akov2.AtlasProject{}, + dataFederation: &akov2.AtlasDataFederation{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "mongodb.com/last-applied-configuration": ` +{ + "privateEndpoints": [ + { + "endpointId": "123", + "provider": "foo", + "type": "some" + } + ] + }`, + }, + }, + }, + wantOK: true, + wantConditions: []api.Condition{}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + ctx := &workflow.Context{ + Context: context.Background(), + Log: zaptest.NewLogger(t).Sugar(), + } + + reconciler := &AtlasDataFederationReconciler{} + result := reconciler.ensurePrivateEndpoints(ctx, tc.service(translation.NewDatafederationPrivateEndpointServiceMock(t)), tc.project, tc.dataFederation) + require.Equal(t, tc.wantOK, result.IsOk()) + + gotConditions := ctx.Conditions() + for i := range ctx.Conditions() { + gotConditions[i].LastTransitionTime = metav1.Time{} + } + require.Equal(t, tc.wantConditions, gotConditions) + }) + } +} diff --git a/test/e2e/datafederation_pe_test.go b/test/e2e/datafederation_pe_test.go index 7e2aa2fa8f..96257414fd 100644 --- a/test/e2e/datafederation_pe_test.go +++ b/test/e2e/datafederation_pe_test.go @@ -99,6 +99,32 @@ var _ = Describe("UserLogin", Label("datafederation"), func() { testData.Project.Name, dataFederationInstanceName, testData.Project.Namespace).WithPrivateEndpoint(pe.ID, "AWS", "DATA_LAKE") + createdDataFederation.Spec.Storage = &akov2.Storage{ + Databases: []akov2.Database{ + { + Name: "test-db-1", + Collections: []akov2.Collection{ + { + Name: "test-collection-1", + DataSources: []akov2.DataSource{ + { + StoreName: "http-test", + Urls: []string{ + "https://data.cityofnewyork.us/api/views/vfnx-vebw/rows.csv", + }, + }, + }, + }, + }, + }, + }, + Stores: []akov2.Store{ + { + Name: "http-test", + Provider: "http", + }, + }, + } Expect(testData.K8SClient.Create(context.Background(), createdDataFederation)).ShouldNot(HaveOccurred()) Eventually(func(g Gomega) { From 8d28c348e4924bad2e375b5e9756b84b81ce6a4d Mon Sep 17 00:00:00 2001 From: Helder Santana Date: Thu, 14 Nov 2024 18:38:38 +0100 Subject: [PATCH 2/6] improve test coverage --- .../datafederation/conversion_endpoints.go | 6 +- .../conversion_endpoints_test.go | 201 ++++++++++++++++++ .../private_endpoint_test.go | 172 +++++++++++++++ 3 files changed, 376 insertions(+), 3 deletions(-) diff --git a/internal/translation/datafederation/conversion_endpoints.go b/internal/translation/datafederation/conversion_endpoints.go index 0e6c476d1f..603213b5fa 100644 --- a/internal/translation/datafederation/conversion_endpoints.go +++ b/internal/translation/datafederation/conversion_endpoints.go @@ -18,7 +18,7 @@ type DatafederationPrivateEndpointEntry struct { ProjectID string } -func NewDatafederationPrivateEndpointEntry(projectID string, pe *akov2.DataFederationPE) *DatafederationPrivateEndpointEntry { +func NewDataFederationPrivateEndpointEntry(projectID string, pe *akov2.DataFederationPE) *DatafederationPrivateEndpointEntry { if pe == nil { return nil } @@ -92,11 +92,11 @@ func MapDatafederationPrivateEndpoints(projectID string, df *akov2.AtlasDataFede } for _, pe := range df.Spec.PrivateEndpoints { addEndpoint(pe.EndpointID) - result[pe.EndpointID].AKO = NewDatafederationPrivateEndpointEntry(projectID, &pe) + result[pe.EndpointID].AKO = NewDataFederationPrivateEndpointEntry(projectID, &pe) } for _, pe := range lastApplied.Spec.PrivateEndpoints { addEndpoint(pe.EndpointID) - result[pe.EndpointID].LastApplied = NewDatafederationPrivateEndpointEntry(projectID, &pe) + result[pe.EndpointID].LastApplied = NewDataFederationPrivateEndpointEntry(projectID, &pe) } return result, nil diff --git a/internal/translation/datafederation/conversion_endpoints_test.go b/internal/translation/datafederation/conversion_endpoints_test.go index 24693bfe8f..bbacbe1873 100644 --- a/internal/translation/datafederation/conversion_endpoints_test.go +++ b/internal/translation/datafederation/conversion_endpoints_test.go @@ -5,7 +5,12 @@ import ( "github.com/google/go-cmp/cmp" fuzz "github.com/google/gofuzz" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/customresource" ) func TestRoundtrip_DataFederationPE(t *testing.T) { @@ -27,3 +32,199 @@ func TestRoundtrip_DataFederationPE(t *testing.T) { require.True(t, equals) } } + +func TestMapDatafederationPrivateEndpoints(t *testing.T) { + tests := map[string]struct { + dataFederation *akov2.AtlasDataFederation + endpoints []*DatafederationPrivateEndpointEntry + expectedResult map[string]*DataFederationPrivateEndpoint + expectedErr string + }{ + "failed to parse last config applied annotation": { + dataFederation: &akov2.AtlasDataFederation{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + customresource.AnnotationLastAppliedConfiguration: "wrong,", + }, + }, + }, + endpoints: []*DatafederationPrivateEndpointEntry{}, + expectedResult: nil, + expectedErr: "error reading data federation from last applied annotation: invalid character 'w' looking for beginning of value", + }, + "map without last applied configuration": { + dataFederation: &akov2.AtlasDataFederation{ + Spec: akov2.DataFederationSpec{ + PrivateEndpoints: []akov2.DataFederationPE{ + { + Provider: "AWS", + Type: "DATA_LAKE", + EndpointID: "vpcpe-123456", + }, + { + Provider: "AZURE", + Type: "DATA_LAKE", + EndpointID: "azure/resource/id", + }, + }, + }, + }, + endpoints: []*DatafederationPrivateEndpointEntry{ + { + DataFederationPE: &akov2.DataFederationPE{ + Provider: "AWS", + Type: "DATA_LAKE", + EndpointID: "vpcpe-123456", + }, + ProjectID: "project-id", + }, + { + DataFederationPE: &akov2.DataFederationPE{ + Provider: "AZURE", + Type: "DATA_LAKE", + EndpointID: "azure/resource/id", + }, + ProjectID: "project-id", + }, + }, + expectedResult: map[string]*DataFederationPrivateEndpoint{ + "vpcpe-123456": { + AKO: &DatafederationPrivateEndpointEntry{ + DataFederationPE: &akov2.DataFederationPE{ + Provider: "AWS", + Type: "DATA_LAKE", + EndpointID: "vpcpe-123456", + }, + ProjectID: "project-id", + }, + Atlas: &DatafederationPrivateEndpointEntry{ + DataFederationPE: &akov2.DataFederationPE{ + Provider: "AWS", + Type: "DATA_LAKE", + EndpointID: "vpcpe-123456", + }, + ProjectID: "project-id", + }, + LastApplied: nil, + }, + "azure/resource/id": { + AKO: &DatafederationPrivateEndpointEntry{ + DataFederationPE: &akov2.DataFederationPE{ + Provider: "AZURE", + Type: "DATA_LAKE", + EndpointID: "azure/resource/id", + }, + ProjectID: "project-id", + }, + Atlas: &DatafederationPrivateEndpointEntry{ + DataFederationPE: &akov2.DataFederationPE{ + Provider: "AZURE", + Type: "DATA_LAKE", + EndpointID: "azure/resource/id", + }, + ProjectID: "project-id", + }, + LastApplied: nil, + }, + }, + }, + "map with last applied configuration": { + dataFederation: &akov2.AtlasDataFederation{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + customresource.AnnotationLastAppliedConfiguration: "{\"name\":\"\",\"privateEndpoints\":[{\"endpointId\":\"vpcpe-123456\"," + + "\"provider\":\"AWS\",\"type\":\"DATA_LAKE\"}]}", + }, + }, + Spec: akov2.DataFederationSpec{ + PrivateEndpoints: []akov2.DataFederationPE{ + { + Provider: "AWS", + Type: "DATA_LAKE", + EndpointID: "vpcpe-123456", + }, + { + Provider: "AZURE", + Type: "DATA_LAKE", + EndpointID: "azure/resource/id", + }, + }, + }, + }, + endpoints: []*DatafederationPrivateEndpointEntry{ + { + DataFederationPE: &akov2.DataFederationPE{ + Provider: "AWS", + Type: "DATA_LAKE", + EndpointID: "vpcpe-123456", + }, + ProjectID: "project-id", + }, + { + DataFederationPE: &akov2.DataFederationPE{ + Provider: "AZURE", + Type: "DATA_LAKE", + EndpointID: "azure/resource/id", + }, + ProjectID: "project-id", + }, + }, + expectedResult: map[string]*DataFederationPrivateEndpoint{ + "vpcpe-123456": { + AKO: &DatafederationPrivateEndpointEntry{ + DataFederationPE: &akov2.DataFederationPE{ + Provider: "AWS", + Type: "DATA_LAKE", + EndpointID: "vpcpe-123456", + }, + ProjectID: "project-id", + }, + Atlas: &DatafederationPrivateEndpointEntry{ + DataFederationPE: &akov2.DataFederationPE{ + Provider: "AWS", + Type: "DATA_LAKE", + EndpointID: "vpcpe-123456", + }, + ProjectID: "project-id", + }, + LastApplied: &DatafederationPrivateEndpointEntry{ + DataFederationPE: &akov2.DataFederationPE{ + Provider: "AWS", + Type: "DATA_LAKE", + EndpointID: "vpcpe-123456", + }, + ProjectID: "project-id", + }, + }, + "azure/resource/id": { + AKO: &DatafederationPrivateEndpointEntry{ + DataFederationPE: &akov2.DataFederationPE{ + Provider: "AZURE", + Type: "DATA_LAKE", + EndpointID: "azure/resource/id", + }, + ProjectID: "project-id", + }, + Atlas: &DatafederationPrivateEndpointEntry{ + DataFederationPE: &akov2.DataFederationPE{ + Provider: "AZURE", + Type: "DATA_LAKE", + EndpointID: "azure/resource/id", + }, + ProjectID: "project-id", + }, + LastApplied: nil, + }, + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + m, err := MapDatafederationPrivateEndpoints("project-id", tt.dataFederation, tt.endpoints) + if err != nil { + assert.EqualError(t, err, tt.expectedErr) + } + assert.Equal(t, tt.expectedResult, m) + }) + } +} diff --git a/pkg/controller/atlasdatafederation/private_endpoint_test.go b/pkg/controller/atlasdatafederation/private_endpoint_test.go index ae91abc3c3..6fa143d356 100644 --- a/pkg/controller/atlasdatafederation/private_endpoint_test.go +++ b/pkg/controller/atlasdatafederation/private_endpoint_test.go @@ -54,6 +54,31 @@ func TestEnsurePrivateEndpoints(t *testing.T) { }, }, }, + { + name: "failed when last applied configuration annotation has wrong data", + service: func(m *translation.DatafederationPrivateEndpointServiceMock) datafederation.DatafederationPrivateEndpointService { + m.EXPECT().List(mock.Anything, mock.Anything).Return(nil, nil) + + return m + }, + project: &akov2.AtlasProject{}, + dataFederation: &akov2.AtlasDataFederation{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "mongodb.com/last-applied-configuration": "{wrongJson", + }, + }, + }, + wantOK: false, + wantConditions: []api.Condition{ + { + Type: "DataFederationPrivateEndpointsReady", + Status: "False", + Reason: "InternalError", + Message: "error reading data federation from last applied annotation: invalid character 'w' looking for beginning of object key string", + }, + }, + }, { name: "create entry in atlas", service: func(m *translation.DatafederationPrivateEndpointServiceMock) datafederation.DatafederationPrivateEndpointService { @@ -74,6 +99,31 @@ func TestEnsurePrivateEndpoints(t *testing.T) { {Type: "DataFederationPrivateEndpointsReady", Status: "True"}, }, }, + { + name: "failed to create entry in atlas", + service: func(m *translation.DatafederationPrivateEndpointServiceMock) datafederation.DatafederationPrivateEndpointService { + m.EXPECT().List(mock.Anything, mock.Anything).Return(nil, nil) + m.EXPECT().Create(mock.Anything, mock.Anything).Return(errors.New("failed to create entry")) + return m + }, + project: &akov2.AtlasProject{}, + dataFederation: &akov2.AtlasDataFederation{ + Spec: akov2.DataFederationSpec{ + PrivateEndpoints: []akov2.DataFederationPE{ + {EndpointID: "123", Provider: "foo", Type: "some"}, + }, + }, + }, + wantOK: false, + wantConditions: []api.Condition{ + { + Type: "DataFederationPrivateEndpointsReady", + Status: "False", + Reason: "InternalError", + Message: "error creating private endpoint: failed to create entry", + }, + }, + }, { name: "delete and update entry in atlas", service: func(m *translation.DatafederationPrivateEndpointServiceMock) datafederation.DatafederationPrivateEndpointService { @@ -102,6 +152,90 @@ func TestEnsurePrivateEndpoints(t *testing.T) { {Type: "DataFederationPrivateEndpointsReady", Status: "True"}, }, }, + { + name: "nothing to update in atlas", + service: func(m *translation.DatafederationPrivateEndpointServiceMock) datafederation.DatafederationPrivateEndpointService { + m.EXPECT().List(mock.Anything, mock.Anything).Return([]*datafederation.DatafederationPrivateEndpointEntry{ + {DataFederationPE: &akov2.DataFederationPE{EndpointID: "123", Provider: "foo", Type: "some"}}, + }, nil) + return m + }, + project: &akov2.AtlasProject{}, + dataFederation: &akov2.AtlasDataFederation{ + Spec: akov2.DataFederationSpec{ + PrivateEndpoints: []akov2.DataFederationPE{ + {EndpointID: "123", Provider: "foo", Type: "some"}, + }, + }, + }, + wantOK: true, + wantConditions: []api.Condition{ + {Type: "DataFederationPrivateEndpointsReady", Status: "True"}, + }, + }, + { + name: "failed to delete when updating entry in atlas", + service: func(m *translation.DatafederationPrivateEndpointServiceMock) datafederation.DatafederationPrivateEndpointService { + m.EXPECT().List(mock.Anything, mock.Anything).Return([]*datafederation.DatafederationPrivateEndpointEntry{ + {DataFederationPE: &akov2.DataFederationPE{EndpointID: "123", Provider: "foo", Type: "some"}}, + {DataFederationPE: &akov2.DataFederationPE{EndpointID: "456", Provider: "bar", Type: "some"}}, + }, nil) + m.EXPECT().Delete(mock.Anything, &datafederation.DatafederationPrivateEndpointEntry{ + DataFederationPE: &akov2.DataFederationPE{EndpointID: "123", Provider: "foo", Type: "some"}, + }).Return(errors.New("failed to delete entry")) + return m + }, + project: &akov2.AtlasProject{}, + dataFederation: &akov2.AtlasDataFederation{ + Spec: akov2.DataFederationSpec{ + PrivateEndpoints: []akov2.DataFederationPE{ + {EndpointID: "123", Provider: "CHANGE", Type: "some"}, + }, + }, + }, + wantOK: false, + wantConditions: []api.Condition{ + { + Type: "DataFederationPrivateEndpointsReady", + Status: "False", + Reason: "InternalError", + Message: "error deleting private endpoint: failed to delete entry", + }, + }, + }, + { + name: "failed to create when updating entry in atlas", + service: func(m *translation.DatafederationPrivateEndpointServiceMock) datafederation.DatafederationPrivateEndpointService { + m.EXPECT().List(mock.Anything, mock.Anything).Return([]*datafederation.DatafederationPrivateEndpointEntry{ + {DataFederationPE: &akov2.DataFederationPE{EndpointID: "123", Provider: "foo", Type: "some"}}, + {DataFederationPE: &akov2.DataFederationPE{EndpointID: "456", Provider: "bar", Type: "some"}}, + }, nil) + m.EXPECT().Delete(mock.Anything, &datafederation.DatafederationPrivateEndpointEntry{ + DataFederationPE: &akov2.DataFederationPE{EndpointID: "123", Provider: "foo", Type: "some"}, + }).Return(nil) + m.EXPECT().Create(mock.Anything, &datafederation.DatafederationPrivateEndpointEntry{ + DataFederationPE: &akov2.DataFederationPE{EndpointID: "123", Provider: "CHANGE", Type: "some"}, + }).Return(errors.New("failed to create entry")) + return m + }, + project: &akov2.AtlasProject{}, + dataFederation: &akov2.AtlasDataFederation{ + Spec: akov2.DataFederationSpec{ + PrivateEndpoints: []akov2.DataFederationPE{ + {EndpointID: "123", Provider: "CHANGE", Type: "some"}, + }, + }, + }, + wantOK: false, + wantConditions: []api.Condition{ + { + Type: "DataFederationPrivateEndpointsReady", + Status: "False", + Reason: "InternalError", + Message: "error creating private endpoint: failed to create entry", + }, + }, + }, { name: "do not delete untracked entry in atlas", service: func(m *translation.DatafederationPrivateEndpointServiceMock) datafederation.DatafederationPrivateEndpointService { @@ -146,6 +280,44 @@ func TestEnsurePrivateEndpoints(t *testing.T) { wantOK: true, wantConditions: []api.Condition{}, }, + { + name: "failed to delete tracked entry in atlas", + service: func(m *translation.DatafederationPrivateEndpointServiceMock) datafederation.DatafederationPrivateEndpointService { + m.EXPECT().List(mock.Anything, mock.Anything).Return([]*datafederation.DatafederationPrivateEndpointEntry{ + {DataFederationPE: &akov2.DataFederationPE{EndpointID: "123", Provider: "foo", Type: "some"}}, + }, nil) + m.EXPECT().Delete(mock.Anything, &datafederation.DatafederationPrivateEndpointEntry{ + DataFederationPE: &akov2.DataFederationPE{EndpointID: "123", Provider: "foo", Type: "some"}, + }).Return(errors.New("failed to delete entry")) + return m + }, + project: &akov2.AtlasProject{}, + dataFederation: &akov2.AtlasDataFederation{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "mongodb.com/last-applied-configuration": ` +{ + "privateEndpoints": [ + { + "endpointId": "123", + "provider": "foo", + "type": "some" + } + ] + }`, + }, + }, + }, + wantOK: false, + wantConditions: []api.Condition{ + { + Type: "DataFederationPrivateEndpointsReady", + Status: "False", + Reason: "InternalError", + Message: "error deleting private endpoint: failed to delete entry", + }, + }, + }, } { t.Run(tc.name, func(t *testing.T) { ctx := &workflow.Context{ From 3a0f64a0b72c51c5778e8e1298bb950807d42555 Mon Sep 17 00:00:00 2001 From: Helder Santana Date: Thu, 14 Nov 2024 19:29:20 +0100 Subject: [PATCH 3/6] increment e2e test --- test/e2e/datafederation_pe_test.go | 47 +++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/test/e2e/datafederation_pe_test.go b/test/e2e/datafederation_pe_test.go index 96257414fd..0142795949 100644 --- a/test/e2e/datafederation_pe_test.go +++ b/test/e2e/datafederation_pe_test.go @@ -29,10 +29,11 @@ import ( // AWS NOTES: reserved VPC in eu-west-2, eu-south-1, us-east-1 (due to limitation no more 4 VPC per region) -var _ = Describe("UserLogin", Label("datafederation"), func() { +var _ = FDescribe("DataFederation Private Endpoint", Label("datafederation"), func() { var testData *model.TestDataProvider var providerAction cloud.Provider var pe *cloud.PrivateEndpointDetails + var secondPE *cloud.PrivateEndpointDetails _ = BeforeEach(OncePerOrdered, func() { checkUpAWSEnvironment() @@ -147,6 +148,50 @@ var _ = Describe("UserLogin", Label("datafederation"), func() { }).WithTimeout(2 * time.Minute).WithPolling(20 * time.Second).Should(BeTrue()) }) + //nolint:dupl + By("Create a new private endpoint in AWS", func() { + Expect(testData.K8SClient.Get(testData.Context, types.NamespacedName{Name: testData.Project.Name, + Namespace: testData.Resources.Namespace}, testData.Project)).To(Succeed()) + + vpcId := providerAction.SetupNetwork( + "AWS", + cloud.WithAWSConfig(&cloud.AWSConfig{ + VPC: utils.RandomName("datafederation-private-endpoint2"), + Region: config.AWSRegionEU, + EnableCleanup: true, + }), + ) + secondPE = providerAction.SetupPrivateEndpoint( + &cloud.AWSPrivateEndpointRequest{ + ID: "vpce-" + vpcId, + Region: config.AWSRegionEU, + // See https://www.mongodb.com/docs/atlas/reference/api-resources-spec/v2/#tag/Data-Federation/operation/createDataFederationPrivateEndpoint + ServiceName: "com.amazonaws.vpce.eu-west-2.vpce-svc-052f1840aa0c4f1f9", + }, + ) + }) + + By("Update DataFederation with the new Private Endpoint", func() { + df := &akov2.AtlasDataFederation{} + Expect(testData.K8SClient.Get(context.Background(), types.NamespacedName{ + Namespace: testData.Project.Namespace, + Name: dataFederationInstanceName, + }, df)).To(Succeed()) + df.Spec.PrivateEndpoints[0].EndpointID = secondPE.ID + Expect(testData.K8SClient.Update(context.Background(), df)).ShouldNot(HaveOccurred()) + }) + + By("Checking the DataFederation is ready", func() { + df := &akov2.AtlasDataFederation{} + Expect(testData.K8SClient.Get(context.Background(), types.NamespacedName{ + Namespace: testData.Project.Namespace, + Name: dataFederationInstanceName, + }, df)).To(Succeed()) + Eventually(func() bool { + return resources.CheckCondition(testData.K8SClient, df, api.TrueCondition(api.ReadyType)) + }).WithTimeout(2 * time.Minute).WithPolling(20 * time.Second).Should(BeTrue()) + }) + By("Delete DataFederation", func() { df := &akov2.AtlasDataFederation{} Expect(testData.K8SClient.Get(context.Background(), types.NamespacedName{ From 6718f05c010ca7aa6e13a6d9884417da2613d643 Mon Sep 17 00:00:00 2001 From: Helder Santana Date: Fri, 15 Nov 2024 12:22:57 +0100 Subject: [PATCH 4/6] remove programmatic focus --- test/e2e/datafederation_pe_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/datafederation_pe_test.go b/test/e2e/datafederation_pe_test.go index 0142795949..10c8a2f7ed 100644 --- a/test/e2e/datafederation_pe_test.go +++ b/test/e2e/datafederation_pe_test.go @@ -29,7 +29,7 @@ import ( // AWS NOTES: reserved VPC in eu-west-2, eu-south-1, us-east-1 (due to limitation no more 4 VPC per region) -var _ = FDescribe("DataFederation Private Endpoint", Label("datafederation"), func() { +var _ = Describe("DataFederation Private Endpoint", Label("datafederation"), func() { var testData *model.TestDataProvider var providerAction cloud.Provider var pe *cloud.PrivateEndpointDetails From 335ffd326e29961bd780e574a75f00f56aad3824 Mon Sep 17 00:00:00 2001 From: Helder Santana Date: Fri, 15 Nov 2024 13:19:15 +0100 Subject: [PATCH 5/6] fix test cleanup --- test/e2e/datafederation_pe_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/test/e2e/datafederation_pe_test.go b/test/e2e/datafederation_pe_test.go index 10c8a2f7ed..bb60cbc516 100644 --- a/test/e2e/datafederation_pe_test.go +++ b/test/e2e/datafederation_pe_test.go @@ -213,6 +213,13 @@ var _ = Describe("DataFederation Private Endpoint", Label("datafederation"), fun g.Expect(err).To(BeNil(), fmt.Sprintf("deletion of private endpoint failed with error %v", err)) g.Expect(resp).NotTo(BeNil()) g.Expect(resp.StatusCode).To(BeEquivalentTo(http.StatusNoContent)) + + _, resp, err = atlasClient.Client.DataFederationApi. + DeleteDataFederationPrivateEndpoint(testData.Context, testData.Project.ID(), secondPE.ID). + Execute() + g.Expect(err).To(BeNil(), fmt.Sprintf("deletion of private endpoint failed with error %v", err)) + g.Expect(resp).NotTo(BeNil()) + g.Expect(resp.StatusCode).To(BeEquivalentTo(http.StatusNoContent)) }).WithTimeout(5 * time.Minute).WithPolling(15 * time.Second).MustPassRepeatedly(2).Should(Succeed()) }) }) From 46a504c6fa326037607bcc51e1518c3ede092330 Mon Sep 17 00:00:00 2001 From: Helder Santana Date: Fri, 15 Nov 2024 14:28:28 +0100 Subject: [PATCH 6/6] fix test cleanup --- test/e2e/datafederation_pe_test.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/test/e2e/datafederation_pe_test.go b/test/e2e/datafederation_pe_test.go index bb60cbc516..c8bbac1b2b 100644 --- a/test/e2e/datafederation_pe_test.go +++ b/test/e2e/datafederation_pe_test.go @@ -208,13 +208,6 @@ var _ = Describe("DataFederation Private Endpoint", Label("datafederation"), fun // TODO: revisit and cleanup once CLOUDP-280905 is fixed Eventually(func(g Gomega) { _, resp, err := atlasClient.Client.DataFederationApi. - DeleteDataFederationPrivateEndpoint(testData.Context, testData.Project.ID(), pe.ID). - Execute() - g.Expect(err).To(BeNil(), fmt.Sprintf("deletion of private endpoint failed with error %v", err)) - g.Expect(resp).NotTo(BeNil()) - g.Expect(resp.StatusCode).To(BeEquivalentTo(http.StatusNoContent)) - - _, resp, err = atlasClient.Client.DataFederationApi. DeleteDataFederationPrivateEndpoint(testData.Context, testData.Project.ID(), secondPE.ID). Execute() g.Expect(err).To(BeNil(), fmt.Sprintf("deletion of private endpoint failed with error %v", err))