From eedac299e89966abe8175dabe53d33652e4db219 Mon Sep 17 00:00:00 2001 From: Milan Pavlik Date: Wed, 27 Jul 2022 09:06:56 +0000 Subject: [PATCH] [usage] Implement CollectUsage --- .../usage/pkg/{controller => apiv1}/pricer.go | 2 +- .../pkg/{controller => apiv1}/pricer_test.go | 2 +- components/usage/pkg/apiv1/usage-report.go | 126 ++++++++++ components/usage/pkg/apiv1/usage.go | 112 ++++++++- components/usage/pkg/apiv1/usage_test.go | 138 ++++++++++- components/usage/pkg/controller/reconciler.go | 226 ++---------------- .../usage/pkg/controller/reconciler_test.go | 164 ------------- components/usage/pkg/server/server.go | 14 +- 8 files changed, 404 insertions(+), 380 deletions(-) rename components/usage/pkg/{controller => apiv1}/pricer.go (98%) rename components/usage/pkg/{controller => apiv1}/pricer_test.go (98%) create mode 100644 components/usage/pkg/apiv1/usage-report.go delete mode 100644 components/usage/pkg/controller/reconciler_test.go diff --git a/components/usage/pkg/controller/pricer.go b/components/usage/pkg/apiv1/pricer.go similarity index 98% rename from components/usage/pkg/controller/pricer.go rename to components/usage/pkg/apiv1/pricer.go index cbd2291a59e699..8ab5e6df3f92ed 100644 --- a/components/usage/pkg/controller/pricer.go +++ b/components/usage/pkg/apiv1/pricer.go @@ -2,7 +2,7 @@ // Licensed under the GNU Affero General Public License (AGPL). // See License-AGPL.txt in the project root for license information. -package controller +package apiv1 import ( "fmt" diff --git a/components/usage/pkg/controller/pricer_test.go b/components/usage/pkg/apiv1/pricer_test.go similarity index 98% rename from components/usage/pkg/controller/pricer_test.go rename to components/usage/pkg/apiv1/pricer_test.go index 1499bd06be5af5..1785af30e76e2f 100644 --- a/components/usage/pkg/controller/pricer_test.go +++ b/components/usage/pkg/apiv1/pricer_test.go @@ -2,7 +2,7 @@ // Licensed under the GNU Affero General Public License (AGPL). // See License-AGPL.txt in the project root for license information. -package controller +package apiv1 import ( "testing" diff --git a/components/usage/pkg/apiv1/usage-report.go b/components/usage/pkg/apiv1/usage-report.go new file mode 100644 index 00000000000000..29b9a856e0e931 --- /dev/null +++ b/components/usage/pkg/apiv1/usage-report.go @@ -0,0 +1,126 @@ +// Copyright (c) 2022 Gitpod GmbH. All rights reserved. +// Licensed under the GNU Affero General Public License (AGPL). +// See License-AGPL.txt in the project root for license information. + +package apiv1 + +import ( + "context" + "fmt" + "github.com/gitpod-io/gitpod/common-go/log" + "github.com/gitpod-io/gitpod/usage/pkg/db" + "gorm.io/gorm" + "time" +) + +type InvalidSession struct { + Reason string + Session db.WorkspaceInstanceForUsage +} + +type UsageReport struct { + GenerationTime time.Time + + From time.Time + To time.Time + + RawSessions []db.WorkspaceInstanceForUsage + InvalidSessions []InvalidSession + + UsageRecords []db.WorkspaceInstanceUsage +} + +func NewReportGenerator(conn *gorm.DB, pricer *WorkspacePricer) *ReportGenerator { + return &ReportGenerator{ + conn: conn, + pricer: pricer, + nowFunc: time.Now, + } +} + +type ReportGenerator struct { + conn *gorm.DB + pricer *WorkspacePricer + nowFunc func() time.Time +} + +func (g *ReportGenerator) GenerateUsageReport(ctx context.Context, from, to time.Time) (UsageReport, error) { + now := g.nowFunc().UTC() + log.Infof("Gathering usage data from %s to %s", from, to) + + report := UsageReport{ + GenerationTime: now, + From: from, + To: to, + } + + instances, err := db.ListWorkspaceInstancesInRange(ctx, g.conn, from, to) + if err != nil { + return report, fmt.Errorf("failed to list instances from db: %w", err) + } + report.RawSessions = instances + + valid, invalid := validateInstances(instances) + report.InvalidSessions = invalid + + if len(invalid) > 0 { + log.WithField("invalid_workspace_instances", invalid).Errorf("Detected %d invalid instances. These will be skipped in the current run.", len(invalid)) + } + log.WithField("workspace_instances", instances).Debug("Successfully loaded workspace instances.") + + trimmed := trimStartStopTime(valid, from, to) + + report.UsageRecords = instancesToUsageRecords(trimmed, g.pricer, now) + return report, nil +} + +func validateInstances(instances []db.WorkspaceInstanceForUsage) (valid []db.WorkspaceInstanceForUsage, invalid []InvalidSession) { + for _, i := range instances { + // i is a pointer to the current element, we need to assign it to ensure we're copying the value, not the current pointer. + instance := i + + // Each instance must have a start time, without it, we do not have a baseline for usage computation. + if !instance.CreationTime.IsSet() { + invalid = append(invalid, InvalidSession{ + Reason: "missing creation time", + Session: instance, + }) + continue + } + + start := instance.CreationTime.Time() + + // Currently running instances do not have a stopped time set, so we ignore these. + if instance.StoppedTime.IsSet() { + stop := instance.StoppedTime.Time() + if stop.Before(start) { + invalid = append(invalid, InvalidSession{ + Reason: "stop time is before start time", + Session: instance, + }) + continue + } + } + + valid = append(valid, instance) + } + return valid, invalid +} + +// trimStartStopTime ensures that start time or stop time of an instance is never outside of specified start or stop time range. +func trimStartStopTime(instances []db.WorkspaceInstanceForUsage, maximumStart, minimumStop time.Time) []db.WorkspaceInstanceForUsage { + var updated []db.WorkspaceInstanceForUsage + + for _, instance := range instances { + if instance.CreationTime.Time().Before(maximumStart) { + instance.CreationTime = db.NewVarcharTime(maximumStart) + } + + if instance.StoppedTime.Time().After(minimumStop) { + instance.StoppedTime = db.NewVarcharTime(minimumStop) + } + + updated = append(updated, instance) + } + return updated +} diff --git a/components/usage/pkg/apiv1/usage.go b/components/usage/pkg/apiv1/usage.go index 1afe48477bcb70..0154fe447788f6 100644 --- a/components/usage/pkg/apiv1/usage.go +++ b/components/usage/pkg/apiv1/usage.go @@ -5,8 +5,11 @@ package apiv1 import ( - context "context" + "context" + "database/sql" + "fmt" "github.com/gitpod-io/gitpod/common-go/log" + "github.com/gitpod-io/gitpod/usage/pkg/contentservice" "time" v1 "github.com/gitpod-io/gitpod/usage-api/v1" @@ -21,12 +24,17 @@ var _ v1.UsageServiceServer = (*UsageService)(nil) type UsageService struct { conn *gorm.DB + + contentService contentservice.Interface + + reportGenerator *ReportGenerator + v1.UnimplementedUsageServiceServer } const maxQuerySize = 31 * 24 * time.Hour -func (us *UsageService) ListBilledUsage(ctx context.Context, in *v1.ListBilledUsageRequest) (*v1.ListBilledUsageResponse, error) { +func (s *UsageService) ListBilledUsage(ctx context.Context, in *v1.ListBilledUsageRequest) (*v1.ListBilledUsageResponse, error) { to := time.Now() if in.To != nil { to = in.To.AsTime() @@ -52,7 +60,7 @@ func (us *UsageService) ListBilledUsage(ctx context.Context, in *v1.ListBilledUs order = db.DescendingOrder } - usageRecords, err := db.ListUsage(ctx, us.conn, db.AttributionID(in.GetAttributionId()), from, to, order) + usageRecords, err := db.ListUsage(ctx, s.conn, db.AttributionID(in.GetAttributionId()), from, to, order) if err != nil { log.Log. WithField("attribution_id", in.AttributionId). @@ -88,6 +96,100 @@ func (us *UsageService) ListBilledUsage(ctx context.Context, in *v1.ListBilledUs }, nil } -func NewUsageService(conn *gorm.DB) *UsageService { - return &UsageService{conn: conn} +func (s *UsageService) ReconcileUsage(ctx context.Context, req *v1.ReconcileUsageRequest) (*v1.ReconcileUsageResponse, error) { + from := req.GetStartTime().AsTime() + to := req.GetEndTime().AsTime() + + if to.Before(from) { + return nil, status.Errorf(codes.InvalidArgument, "End time must be after start time") + } + + report, err := s.reportGenerator.GenerateUsageReport(ctx, from, to) + if err != nil { + log.Log.WithError(err).Error("Failed to reconcile time range.") + return nil, status.Error(codes.Internal, "failed to reconcile time range") + } + + err = db.CreateUsageRecords(ctx, s.conn, report.UsageRecords) + if err != nil { + log.Log.WithError(err).Error("Failed to persist usage records.") + return nil, status.Error(codes.Internal, "failed to persist usage records") + } + + filename := fmt.Sprintf("%s.gz", time.Now().Format(time.RFC3339)) + err = s.contentService.UploadUsageReport(ctx, filename, report.UsageRecords) + if err != nil { + log.Log.WithError(err).Error("Failed to persist usage report to content service.") + return nil, status.Error(codes.Internal, "failed to persist usage report to content service") + } + + var sessions []*v1.BilledSession + for _, instance := range report.UsageRecords { + sessions = append(sessions, usageRecordToBilledUsageProto(instance)) + } + + return &v1.ReconcileUsageResponse{ + Sessions: sessions, + }, nil + +} + +func NewUsageService(conn *gorm.DB, reportGenerator *ReportGenerator, contentSvc contentservice.Interface) *UsageService { + return &UsageService{ + conn: conn, + reportGenerator: reportGenerator, + contentService: contentSvc, + } +} + +func usageRecordToBilledUsageProto(usageRecord db.WorkspaceInstanceUsage) *v1.BilledSession { + var endTime *timestamppb.Timestamp + if usageRecord.StoppedAt.Valid { + endTime = timestamppb.New(usageRecord.StoppedAt.Time) + } + return &v1.BilledSession{ + AttributionId: string(usageRecord.AttributionID), + UserId: usageRecord.UserID.String(), + WorkspaceId: usageRecord.WorkspaceID, + TeamId: "", + WorkspaceType: string(usageRecord.WorkspaceType), + ProjectId: usageRecord.ProjectID, + InstanceId: usageRecord.InstanceID.String(), + WorkspaceClass: usageRecord.WorkspaceClass, + StartTime: timestamppb.New(usageRecord.StartedAt), + EndTime: endTime, + Credits: usageRecord.CreditsUsed, + } +} + +func instancesToUsageRecords(instances []db.WorkspaceInstanceForUsage, pricer *WorkspacePricer, now time.Time) []db.WorkspaceInstanceUsage { + var usageRecords []db.WorkspaceInstanceUsage + + for _, instance := range instances { + var stoppedAt sql.NullTime + if instance.StoppedTime.IsSet() { + stoppedAt = sql.NullTime{Time: instance.StoppedTime.Time(), Valid: true} + } + + projectID := "" + if instance.ProjectID.Valid { + projectID = instance.ProjectID.String + } + + usageRecords = append(usageRecords, db.WorkspaceInstanceUsage{ + InstanceID: instance.ID, + AttributionID: instance.UsageAttributionID, + WorkspaceID: instance.WorkspaceID, + ProjectID: projectID, + UserID: instance.OwnerID, + WorkspaceType: instance.Type, + WorkspaceClass: instance.WorkspaceClass, + StartedAt: instance.CreationTime.Time(), + StoppedAt: stoppedAt, + CreditsUsed: pricer.CreditsUsedByInstance(&instance, now), + GenerationID: 0, + }) + } + + return usageRecords } diff --git a/components/usage/pkg/apiv1/usage_test.go b/components/usage/pkg/apiv1/usage_test.go index f6c20cf741dc85..622e97b2a08643 100644 --- a/components/usage/pkg/apiv1/usage_test.go +++ b/components/usage/pkg/apiv1/usage_test.go @@ -142,7 +142,8 @@ func TestUsageService_ListBilledUsage(t *testing.T) { baseserver.WithGRPC(baseserver.MustUseRandomLocalAddress(t)), ) - v1.RegisterUsageServiceServer(srv.GRPC(), NewUsageService(dbconn)) + generator := NewReportGenerator(dbconn, DefaultWorkspacePricer) + v1.RegisterUsageServiceServer(srv.GRPC(), NewUsageService(dbconn, generator, nil)) baseserver.StartServerForTests(t, srv) conn, err := grpc.Dial(srv.GRPCAddress(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -166,3 +167,138 @@ func TestUsageService_ListBilledUsage(t *testing.T) { }) } } + +func TestInstanceToUsageRecords(t *testing.T) { + maxStopTime := time.Date(2022, 05, 31, 23, 00, 00, 00, time.UTC) + teamID, ownerID, projectID := uuid.New().String(), uuid.New(), uuid.New() + workspaceID := dbtest.GenerateWorkspaceID() + teamAttributionID := db.NewTeamAttributionID(teamID) + instanceId := uuid.New() + creationTime := db.NewVarcharTime(time.Date(2022, 05, 30, 00, 00, 00, 00, time.UTC)) + stoppedTime := db.NewVarcharTime(time.Date(2022, 06, 1, 1, 0, 0, 0, time.UTC)) + + scenarios := []struct { + Name string + Records []db.WorkspaceInstanceForUsage + Expected []db.WorkspaceInstanceUsage + }{ + { + Name: "a stopped workspace instance", + Records: []db.WorkspaceInstanceForUsage{ + { + ID: instanceId, + WorkspaceID: workspaceID, + OwnerID: ownerID, + ProjectID: sql.NullString{}, + WorkspaceClass: defaultWorkspaceClass, + Type: db.WorkspaceType_Prebuild, + UsageAttributionID: teamAttributionID, + CreationTime: creationTime, + StoppedTime: stoppedTime, + }, + }, + Expected: []db.WorkspaceInstanceUsage{{ + InstanceID: instanceId, + AttributionID: teamAttributionID, + UserID: ownerID, + WorkspaceID: workspaceID, + ProjectID: "", + WorkspaceType: db.WorkspaceType_Prebuild, + WorkspaceClass: defaultWorkspaceClass, + CreditsUsed: 470, + StartedAt: creationTime.Time(), + StoppedAt: sql.NullTime{Time: stoppedTime.Time(), Valid: true}, + GenerationID: 0, + Deleted: false, + }}, + }, + { + Name: "workspace instance that is still running", + Records: []db.WorkspaceInstanceForUsage{ + { + ID: instanceId, + OwnerID: ownerID, + ProjectID: sql.NullString{String: projectID.String(), Valid: true}, + WorkspaceClass: defaultWorkspaceClass, + Type: db.WorkspaceType_Regular, + WorkspaceID: workspaceID, + UsageAttributionID: teamAttributionID, + CreationTime: creationTime, + StoppedTime: db.VarcharTime{}, + }, + }, + Expected: []db.WorkspaceInstanceUsage{{ + InstanceID: instanceId, + AttributionID: teamAttributionID, + UserID: ownerID, + ProjectID: projectID.String(), + WorkspaceID: workspaceID, + WorkspaceType: db.WorkspaceType_Regular, + StartedAt: creationTime.Time(), + StoppedAt: sql.NullTime{}, + WorkspaceClass: defaultWorkspaceClass, + CreditsUsed: 470, + }}, + }, + } + + for _, s := range scenarios { + t.Run(s.Name, func(t *testing.T) { + actual := instancesToUsageRecords(s.Records, DefaultWorkspacePricer, maxStopTime) + require.Equal(t, s.Expected, actual) + }) + } +} + +func TestReportGenerator_GenerateUsageReport(t *testing.T) { + startOfMay := time.Date(2022, 05, 1, 0, 00, 00, 00, time.UTC) + startOfJune := time.Date(2022, 06, 1, 0, 00, 00, 00, time.UTC) + + teamID := uuid.New() + scenarioRunTime := time.Date(2022, 05, 31, 23, 00, 00, 00, time.UTC) + + instances := []db.WorkspaceInstance{ + // Ran throughout the reconcile period + dbtest.NewWorkspaceInstance(t, db.WorkspaceInstance{ + ID: uuid.New(), + UsageAttributionID: db.NewTeamAttributionID(teamID.String()), + CreationTime: db.NewVarcharTime(time.Date(2022, 05, 1, 00, 00, 00, 00, time.UTC)), + StartedTime: db.NewVarcharTime(time.Date(2022, 05, 1, 00, 00, 00, 00, time.UTC)), + StoppedTime: db.NewVarcharTime(time.Date(2022, 06, 1, 1, 0, 0, 0, time.UTC)), + }), + // Still running + dbtest.NewWorkspaceInstance(t, db.WorkspaceInstance{ + ID: uuid.New(), + UsageAttributionID: db.NewTeamAttributionID(teamID.String()), + CreationTime: db.NewVarcharTime(time.Date(2022, 05, 30, 00, 00, 00, 00, time.UTC)), + StartedTime: db.NewVarcharTime(time.Date(2022, 05, 30, 00, 00, 00, 00, time.UTC)), + }), + // No creation time, invalid record + dbtest.NewWorkspaceInstance(t, db.WorkspaceInstance{ + ID: uuid.New(), + UsageAttributionID: db.NewTeamAttributionID(teamID.String()), + StartedTime: db.NewVarcharTime(time.Date(2022, 06, 1, 1, 0, 0, 0, time.UTC)), + StoppedTime: db.NewVarcharTime(time.Date(2022, 06, 1, 1, 0, 0, 0, time.UTC)), + }), + } + + conn := dbtest.ConnectForTests(t) + dbtest.CreateWorkspaceInstances(t, conn, instances...) + + nowFunc := func() time.Time { return scenarioRunTime } + generator := &ReportGenerator{ + nowFunc: nowFunc, + conn: conn, + pricer: DefaultWorkspacePricer, + } + + report, err := generator.GenerateUsageReport(context.Background(), startOfMay, startOfJune) + require.NoError(t, err) + + require.Equal(t, nowFunc(), report.GenerationTime) + require.Equal(t, startOfMay, report.From) + require.Equal(t, startOfJune, report.To) + require.Len(t, report.RawSessions, 3) + require.Len(t, report.InvalidSessions, 1) + require.Len(t, report.UsageRecords, 2) +} diff --git a/components/usage/pkg/controller/reconciler.go b/components/usage/pkg/controller/reconciler.go index 799bbe5312fa95..55e2cc10e08229 100644 --- a/components/usage/pkg/controller/reconciler.go +++ b/components/usage/pkg/controller/reconciler.go @@ -6,17 +6,10 @@ package controller import ( "context" - "database/sql" "fmt" v1 "github.com/gitpod-io/gitpod/usage-api/v1" "google.golang.org/protobuf/types/known/timestamppb" "time" - - "github.com/gitpod-io/gitpod/common-go/log" - "github.com/gitpod-io/gitpod/usage/pkg/contentservice" - "github.com/gitpod-io/gitpod/usage/pkg/db" - "github.com/google/uuid" - "gorm.io/gorm" ) type Reconciler interface { @@ -29,35 +22,24 @@ func (f ReconcilerFunc) Reconcile() error { return f() } -type UsageReconciler struct { - nowFunc func() time.Time - conn *gorm.DB - pricer *WorkspacePricer - billingService v1.BillingServiceClient - contentService contentservice.Interface -} - -func NewUsageReconciler(conn *gorm.DB, pricer *WorkspacePricer, billingClient v1.BillingServiceClient, contentService contentservice.Interface) *UsageReconciler { - return &UsageReconciler{ - conn: conn, - pricer: pricer, - billingService: billingClient, - contentService: contentService, - nowFunc: time.Now, +func NewUsageAndBillingReconciler(usageClient v1.UsageServiceClient, billingClient v1.BillingServiceClient) *UsageAndBillingReconciler { + return &UsageAndBillingReconciler{ + nowFunc: time.Now, + usageClient: usageClient, + billingClient: billingClient, } } -type UsageReconcileStatus struct { - StartTime time.Time - EndTime time.Time +type UsageAndBillingReconciler struct { + nowFunc func() time.Time - WorkspaceInstances int - InvalidWorkspaceInstances int + usageClient v1.UsageServiceClient + billingClient v1.BillingServiceClient } -func (u *UsageReconciler) Reconcile() (err error) { +func (r *UsageAndBillingReconciler) Reconcile() (err error) { ctx := context.Background() - now := time.Now().UTC() + now := r.nowFunc().UTC() reportUsageReconcileStarted() defer func() { @@ -67,186 +49,24 @@ func (u *UsageReconciler) Reconcile() (err error) { startOfCurrentMonth := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.UTC) startOfNextMonth := startOfCurrentMonth.AddDate(0, 1, 0) - status, report, err := u.ReconcileTimeRange(ctx, startOfCurrentMonth, startOfNextMonth) - if err != nil { - return err - } - log.WithField("usage_reconcile_status", status).Info("Reconcile completed.") - - err = db.CreateUsageRecords(ctx, u.conn, report) - if err != nil { - return fmt.Errorf("failed to write usage records to database: %s", err) - } - - filename := fmt.Sprintf("%s.gz", now.Format(time.RFC3339)) - err = u.contentService.UploadUsageReport(ctx, filename, report) - if err != nil { - return fmt.Errorf("failed to upload usage report: %w", err) - } - - return nil -} - -func (u *UsageReconciler) ReconcileTimeRange(ctx context.Context, from, to time.Time) (*UsageReconcileStatus, UsageReport, error) { - now := u.nowFunc().UTC() - log.Infof("Gathering usage data from %s to %s", from, to) - status := &UsageReconcileStatus{ - StartTime: from, - EndTime: to, - } - instances, invalidInstances, err := u.loadWorkspaceInstances(ctx, from, to) - if err != nil { - return nil, nil, fmt.Errorf("failed to load workspace instances: %w", err) - } - status.WorkspaceInstances = len(instances) - status.InvalidWorkspaceInstances = len(invalidInstances) - - if len(invalidInstances) > 0 { - log.WithField("invalid_workspace_instances", invalidInstances).Errorf("Detected %d invalid instances. These will be skipped in the current run.", len(invalidInstances)) - } - log.WithField("workspace_instances", instances).Debug("Successfully loaded workspace instances.") - - usageRecords := instancesToUsageRecords(instances, u.pricer, now) - - _, err = u.billingService.UpdateInvoices(ctx, &v1.UpdateInvoicesRequest{ - StartTime: timestamppb.New(from), - EndTime: timestamppb.New(to), - Sessions: instancesToBilledSessions(usageRecords), + usageResp, err := r.usageClient.ReconcileUsage(ctx, &v1.ReconcileUsageRequest{ + StartTime: timestamppb.New(startOfCurrentMonth), + EndTime: timestamppb.New(startOfNextMonth), }) if err != nil { - return nil, nil, fmt.Errorf("failed to update invoices: %w", err) + return fmt.Errorf("failed to reconcile usage: %w", err) } - return status, usageRecords, nil -} - -func instancesToUsageRecords(instances []db.WorkspaceInstanceForUsage, pricer *WorkspacePricer, now time.Time) []db.WorkspaceInstanceUsage { - var usageRecords []db.WorkspaceInstanceUsage - - for _, instance := range instances { - var stoppedAt sql.NullTime - if instance.StoppedTime.IsSet() { - stoppedAt = sql.NullTime{Time: instance.StoppedTime.Time(), Valid: true} - } - - projectID := "" - if instance.ProjectID.Valid { - projectID = instance.ProjectID.String - } - - usageRecords = append(usageRecords, db.WorkspaceInstanceUsage{ - InstanceID: instance.ID, - AttributionID: instance.UsageAttributionID, - WorkspaceID: instance.WorkspaceID, - ProjectID: projectID, - UserID: instance.OwnerID, - WorkspaceType: instance.Type, - WorkspaceClass: instance.WorkspaceClass, - StartedAt: instance.CreationTime.Time(), - StoppedAt: stoppedAt, - CreditsUsed: pricer.CreditsUsedByInstance(&instance, now), - GenerationID: 0, - }) - } + sessions := usageResp.GetSessions() - return usageRecords -} - -func instancesToBilledSessions(instances []db.WorkspaceInstanceUsage) []*v1.BilledSession { - var sessions []*v1.BilledSession - - for _, instance := range instances { - var endTime *timestamppb.Timestamp - - if instance.StoppedAt.Valid { - endTime = timestamppb.New(instance.StoppedAt.Time) - } - - sessions = append(sessions, &v1.BilledSession{ - AttributionId: string(instance.AttributionID), - UserId: instance.UserID.String(), - TeamId: "", - WorkspaceId: instance.WorkspaceID, - WorkspaceType: string(instance.WorkspaceType), - ProjectId: instance.ProjectID, - InstanceId: instance.InstanceID.String(), - WorkspaceClass: instance.WorkspaceClass, - StartTime: timestamppb.New(instance.StartedAt), - EndTime: endTime, - Credits: instance.CreditsUsed, - }) - } - - return sessions -} - -type UsageReport []db.WorkspaceInstanceUsage - -type invalidWorkspaceInstance struct { - reason string - workspaceInstanceID uuid.UUID -} - -func (u *UsageReconciler) loadWorkspaceInstances(ctx context.Context, from, to time.Time) ([]db.WorkspaceInstanceForUsage, []invalidWorkspaceInstance, error) { - log.Infof("Gathering usage data from %s to %s", from, to) - instances, err := db.ListWorkspaceInstancesInRange(ctx, u.conn, from, to) + _, err = r.billingClient.UpdateInvoices(ctx, &v1.UpdateInvoicesRequest{ + StartTime: timestamppb.New(startOfCurrentMonth), + EndTime: timestamppb.New(startOfNextMonth), + Sessions: sessions, + }) if err != nil { - return nil, nil, fmt.Errorf("failed to list instances from db: %w", err) + return fmt.Errorf("failed to update invoices: %w", err) } - log.Infof("Identified %d instances between %s and %s", len(instances), from, to) - valid, invalid := validateInstances(instances) - trimmed := trimStartStopTime(valid, from, to) - return trimmed, invalid, nil -} - -func validateInstances(instances []db.WorkspaceInstanceForUsage) (valid []db.WorkspaceInstanceForUsage, invalid []invalidWorkspaceInstance) { - for _, i := range instances { - // i is a pointer to the current element, we need to assign it to ensure we're copying the value, not the current pointer. - instance := i - - // Each instance must have a start time, without it, we do not have a baseline for usage computation. - if !instance.CreationTime.IsSet() { - invalid = append(invalid, invalidWorkspaceInstance{ - reason: "missing creation time", - workspaceInstanceID: instance.ID, - }) - continue - } - - start := instance.CreationTime.Time() - - // Currently running instances do not have a stopped time set, so we ignore these. - if instance.StoppedTime.IsSet() { - stop := instance.StoppedTime.Time() - if stop.Before(start) { - invalid = append(invalid, invalidWorkspaceInstance{ - reason: "stop time is before start time", - workspaceInstanceID: instance.ID, - }) - continue - } - } - - valid = append(valid, instance) - } - return valid, invalid -} - -// trimStartStopTime ensures that start time or stop time of an instance is never outside of specified start or stop time range. -func trimStartStopTime(instances []db.WorkspaceInstanceForUsage, maximumStart, minimumStop time.Time) []db.WorkspaceInstanceForUsage { - var updated []db.WorkspaceInstanceForUsage - - for _, instance := range instances { - if instance.CreationTime.Time().Before(maximumStart) { - instance.CreationTime = db.NewVarcharTime(maximumStart) - } - - if instance.StoppedTime.Time().After(minimumStop) { - instance.StoppedTime = db.NewVarcharTime(minimumStop) - } - - updated = append(updated, instance) - } - return updated + return nil } diff --git a/components/usage/pkg/controller/reconciler_test.go b/components/usage/pkg/controller/reconciler_test.go deleted file mode 100644 index 249020db43852c..00000000000000 --- a/components/usage/pkg/controller/reconciler_test.go +++ /dev/null @@ -1,164 +0,0 @@ -// Copyright (c) 2022 Gitpod GmbH. All rights reserved. -// Licensed under the GNU Affero General Public License (AGPL). -// See License-AGPL.txt in the project root for license information. - -package controller - -import ( - "context" - "database/sql" - v1 "github.com/gitpod-io/gitpod/usage-api/v1" - "google.golang.org/grpc" - "testing" - "time" - - "github.com/gitpod-io/gitpod/usage/pkg/db" - "github.com/gitpod-io/gitpod/usage/pkg/db/dbtest" - "github.com/google/uuid" - "github.com/stretchr/testify/require" -) - -func TestUsageReconciler_ReconcileTimeRange(t *testing.T) { - startOfMay := time.Date(2022, 05, 1, 0, 00, 00, 00, time.UTC) - startOfJune := time.Date(2022, 06, 1, 0, 00, 00, 00, time.UTC) - - teamID := uuid.New() - scenarioRunTime := time.Date(2022, 05, 31, 23, 00, 00, 00, time.UTC) - - instances := []db.WorkspaceInstance{ - // Ran throughout the reconcile period - dbtest.NewWorkspaceInstance(t, db.WorkspaceInstance{ - ID: uuid.New(), - UsageAttributionID: db.NewTeamAttributionID(teamID.String()), - CreationTime: db.NewVarcharTime(time.Date(2022, 05, 1, 00, 00, 00, 00, time.UTC)), - StartedTime: db.NewVarcharTime(time.Date(2022, 05, 1, 00, 00, 00, 00, time.UTC)), - StoppedTime: db.NewVarcharTime(time.Date(2022, 06, 1, 1, 0, 0, 0, time.UTC)), - }), - // Still running - dbtest.NewWorkspaceInstance(t, db.WorkspaceInstance{ - ID: uuid.New(), - UsageAttributionID: db.NewTeamAttributionID(teamID.String()), - CreationTime: db.NewVarcharTime(time.Date(2022, 05, 30, 00, 00, 00, 00, time.UTC)), - StartedTime: db.NewVarcharTime(time.Date(2022, 05, 30, 00, 00, 00, 00, time.UTC)), - }), - // No creation time, invalid record - dbtest.NewWorkspaceInstance(t, db.WorkspaceInstance{ - ID: uuid.New(), - UsageAttributionID: db.NewTeamAttributionID(teamID.String()), - StartedTime: db.NewVarcharTime(time.Date(2022, 06, 1, 1, 0, 0, 0, time.UTC)), - StoppedTime: db.NewVarcharTime(time.Date(2022, 06, 1, 1, 0, 0, 0, time.UTC)), - }), - } - - conn := dbtest.ConnectForTests(t) - dbtest.CreateWorkspaceInstances(t, conn, instances...) - - reconciler := &UsageReconciler{ - billingService: &NoOpBillingServiceClient{}, - nowFunc: func() time.Time { return scenarioRunTime }, - conn: conn, - pricer: DefaultWorkspacePricer, - } - status, report, err := reconciler.ReconcileTimeRange(context.Background(), startOfMay, startOfJune) - require.NoError(t, err) - - require.Len(t, report, 2) - require.Equal(t, &UsageReconcileStatus{ - StartTime: startOfMay, - EndTime: startOfJune, - WorkspaceInstances: 2, - InvalidWorkspaceInstances: 1, - }, status) -} - -type NoOpBillingServiceClient struct{} - -func (c *NoOpBillingServiceClient) UpdateInvoices(ctx context.Context, in *v1.UpdateInvoicesRequest, opts ...grpc.CallOption) (*v1.UpdateInvoicesResponse, error) { - return &v1.UpdateInvoicesResponse{}, nil -} - -func (c *NoOpBillingServiceClient) GetLatestInvoice(ctx context.Context, in *v1.GetLatestInvoiceRequest, opts ...grpc.CallOption) (*v1.GetLatestInvoiceResponse, error) { - return &v1.GetLatestInvoiceResponse{}, nil -} - -func TestInstanceToUsageRecords(t *testing.T) { - maxStopTime := time.Date(2022, 05, 31, 23, 00, 00, 00, time.UTC) - teamID, ownerID, projectID := uuid.New().String(), uuid.New(), uuid.New() - workspaceID := dbtest.GenerateWorkspaceID() - teamAttributionID := db.NewTeamAttributionID(teamID) - instanceId := uuid.New() - creationTime := db.NewVarcharTime(time.Date(2022, 05, 30, 00, 00, 00, 00, time.UTC)) - stoppedTime := db.NewVarcharTime(time.Date(2022, 06, 1, 1, 0, 0, 0, time.UTC)) - - scenarios := []struct { - Name string - Records []db.WorkspaceInstanceForUsage - Expected []db.WorkspaceInstanceUsage - }{ - { - Name: "a stopped workspace instance", - Records: []db.WorkspaceInstanceForUsage{ - { - ID: instanceId, - WorkspaceID: workspaceID, - OwnerID: ownerID, - ProjectID: sql.NullString{}, - WorkspaceClass: defaultWorkspaceClass, - Type: db.WorkspaceType_Prebuild, - UsageAttributionID: teamAttributionID, - CreationTime: creationTime, - StoppedTime: stoppedTime, - }, - }, - Expected: []db.WorkspaceInstanceUsage{{ - InstanceID: instanceId, - AttributionID: teamAttributionID, - UserID: ownerID, - WorkspaceID: workspaceID, - ProjectID: "", - WorkspaceType: db.WorkspaceType_Prebuild, - WorkspaceClass: defaultWorkspaceClass, - CreditsUsed: 470, - StartedAt: creationTime.Time(), - StoppedAt: sql.NullTime{Time: stoppedTime.Time(), Valid: true}, - GenerationID: 0, - Deleted: false, - }}, - }, - { - Name: "workspace instance that is still running", - Records: []db.WorkspaceInstanceForUsage{ - { - ID: instanceId, - OwnerID: ownerID, - ProjectID: sql.NullString{String: projectID.String(), Valid: true}, - WorkspaceClass: defaultWorkspaceClass, - Type: db.WorkspaceType_Regular, - WorkspaceID: workspaceID, - UsageAttributionID: teamAttributionID, - CreationTime: creationTime, - StoppedTime: db.VarcharTime{}, - }, - }, - Expected: []db.WorkspaceInstanceUsage{{ - InstanceID: instanceId, - AttributionID: teamAttributionID, - UserID: ownerID, - ProjectID: projectID.String(), - WorkspaceID: workspaceID, - WorkspaceType: db.WorkspaceType_Regular, - StartedAt: creationTime.Time(), - StoppedAt: sql.NullTime{}, - WorkspaceClass: defaultWorkspaceClass, - CreditsUsed: 470, - }}, - }, - } - - for _, s := range scenarios { - t.Run(s.Name, func(t *testing.T) { - actual := instancesToUsageRecords(s.Records, DefaultWorkspacePricer, maxStopTime) - require.Equal(t, s.Expected, actual) - }) - } -} diff --git a/components/usage/pkg/server/server.go b/components/usage/pkg/server/server.go index 3d75723551c9b8..ea445bc4332d0a 100644 --- a/components/usage/pkg/server/server.go +++ b/components/usage/pkg/server/server.go @@ -65,7 +65,7 @@ func Start(cfg Config) error { return fmt.Errorf("failed to create self-connection to grpc server: %w", err) } - pricer, err := controller.NewWorkspacePricer(cfg.CreditsPerMinuteByWorkspaceClass) + pricer, err := apiv1.NewWorkspacePricer(cfg.CreditsPerMinuteByWorkspaceClass) if err != nil { return fmt.Errorf("failed to create workspace pricer: %w", err) } @@ -95,7 +95,11 @@ func Start(cfg Config) error { contentService = contentservice.New(cfg.ContentServiceAddress) } - ctrl, err := controller.New(schedule, controller.NewUsageReconciler(conn, pricer, v1.NewBillingServiceClient(selfConnection), contentService)) + reportGenerator := apiv1.NewReportGenerator(conn, pricer) + ctrl, err := controller.New(schedule, controller.NewUsageAndBillingReconciler( + v1.NewUsageServiceClient(selfConnection), + v1.NewBillingServiceClient(selfConnection), + )) if err != nil { return fmt.Errorf("failed to initialize usage controller: %w", err) } @@ -106,7 +110,7 @@ func Start(cfg Config) error { } defer ctrl.Stop() - err = registerGRPCServices(srv, conn, stripeClient) + err = registerGRPCServices(srv, conn, stripeClient, reportGenerator, contentService) if err != nil { return fmt.Errorf("failed to register gRPC services: %w", err) } @@ -124,8 +128,8 @@ func Start(cfg Config) error { return nil } -func registerGRPCServices(srv *baseserver.Server, conn *gorm.DB, stripeClient *stripe.Client) error { - v1.RegisterUsageServiceServer(srv.GRPC(), apiv1.NewUsageService(conn)) +func registerGRPCServices(srv *baseserver.Server, conn *gorm.DB, stripeClient *stripe.Client, reportGenerator *apiv1.ReportGenerator, contentSvc contentservice.Interface) error { + v1.RegisterUsageServiceServer(srv.GRPC(), apiv1.NewUsageService(conn, reportGenerator, contentSvc)) if stripeClient == nil { v1.RegisterBillingServiceServer(srv.GRPC(), &apiv1.BillingServiceNoop{}) } else {