-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[usage] Refactor controller package into scheduler #12831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
// Copyright (c) 2022 Gitpod GmbH. All rights reserved. | ||
// Licensed under the GNU Affero General Public License (AGPL). | ||
// See License-AGPL.txt in the project root for license information. | ||
|
||
package scheduler | ||
|
||
import ( | ||
"context" | ||
v1 "github.com/gitpod-io/gitpod/usage-api/v1" | ||
"github.com/stretchr/testify/require" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
"sync" | ||
"sync/atomic" | ||
"testing" | ||
"time" | ||
) | ||
|
||
func TestLedgerJob_PreventsConcurrentInvocations(t *testing.T) { | ||
client := &fakeUsageClient{} | ||
job := NewLedgerTrigger(client, nil) | ||
|
||
invocations := 3 | ||
wg := sync.WaitGroup{} | ||
wg.Add(invocations) | ||
for i := 0; i < invocations; i++ { | ||
go func() { | ||
_ = job.Run() | ||
wg.Done() | ||
}() | ||
} | ||
wg.Wait() | ||
|
||
require.Equal(t, 1, int(client.ReconcileUsageWithLedgerCallCount)) | ||
} | ||
|
||
func TestLedgerJob_CanRunRepeatedly(t *testing.T) { | ||
client := &fakeUsageClient{} | ||
job := NewLedgerTrigger(client, nil) | ||
|
||
_ = job.Run() | ||
_ = job.Run() | ||
|
||
require.Equal(t, 2, int(client.ReconcileUsageWithLedgerCallCount)) | ||
} | ||
|
||
type fakeUsageClient struct { | ||
ReconcileUsageWithLedgerCallCount int32 | ||
} | ||
|
||
// GetCostCenter retrieves the active cost center for the given attributionID | ||
func (c *fakeUsageClient) GetCostCenter(ctx context.Context, in *v1.GetCostCenterRequest, opts ...grpc.CallOption) (*v1.GetCostCenterResponse, error) { | ||
return nil, status.Error(codes.Unauthenticated, "not implemented") | ||
} | ||
|
||
// SetCostCenter stores the given cost center | ||
func (c *fakeUsageClient) SetCostCenter(ctx context.Context, in *v1.SetCostCenterRequest, opts ...grpc.CallOption) (*v1.SetCostCenterResponse, error) { | ||
return nil, status.Error(codes.Unauthenticated, "not implemented") | ||
} | ||
|
||
// Triggers reconciliation of usage with ledger implementation. | ||
func (c *fakeUsageClient) ReconcileUsageWithLedger(ctx context.Context, in *v1.ReconcileUsageWithLedgerRequest, opts ...grpc.CallOption) (*v1.ReconcileUsageWithLedgerResponse, error) { | ||
atomic.AddInt32(&c.ReconcileUsageWithLedgerCallCount, 1) | ||
time.Sleep(50 * time.Millisecond) | ||
|
||
return nil, status.Error(codes.Unauthenticated, "not implemented") | ||
} | ||
|
||
// ListUsage retrieves all usage for the specified attributionId and theb given time range | ||
func (c *fakeUsageClient) ListUsage(ctx context.Context, in *v1.ListUsageRequest, opts ...grpc.CallOption) (*v1.ListUsageResponse, error) { | ||
return nil, status.Error(codes.Unauthenticated, "not implemented") | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ | |
// Licensed under the GNU Affero General Public License (AGPL). | ||
// See License-AGPL.txt in the project root for license information. | ||
|
||
package controller | ||
package scheduler | ||
|
||
import ( | ||
"fmt" | ||
|
@@ -16,26 +16,26 @@ const ( | |
) | ||
|
||
var ( | ||
reconcileStartedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ | ||
jobStartedSeconds = prometheus.NewCounterVec(prometheus.CounterOpts{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are now general job metrics, rather than specific for the reconcile with ledger. If a job desires more metrics, those can be added in job direclty rather than on the scheduler level. |
||
Namespace: namespace, | ||
Subsystem: subsystem, | ||
Name: "reconcile_started_total", | ||
Help: "Number of usage reconciliation runs started", | ||
}, []string{}) | ||
Name: "scheduler_job_started_total", | ||
Help: "Number of jobs started", | ||
}, []string{"job"}) | ||
|
||
reconcileStartedDurationSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{ | ||
jobCompletedSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{ | ||
Namespace: namespace, | ||
Subsystem: subsystem, | ||
Name: "reconcile_completed_duration_seconds", | ||
Help: "Histogram of reconcile duration", | ||
Name: "scheduler_job_completed_seconds", | ||
Help: "Histogram of job duration", | ||
Buckets: prometheus.LinearBuckets(30, 30, 10), // every 30 secs, starting at 30secs | ||
}, []string{"outcome"}) | ||
}, []string{"job", "outcome"}) | ||
) | ||
|
||
func RegisterMetrics(reg *prometheus.Registry) error { | ||
metrics := []prometheus.Collector{ | ||
reconcileStartedTotal, | ||
reconcileStartedDurationSeconds, | ||
jobStartedSeconds, | ||
jobCompletedSeconds, | ||
} | ||
for _, metric := range metrics { | ||
err := reg.Register(metric) | ||
|
@@ -47,14 +47,14 @@ func RegisterMetrics(reg *prometheus.Registry) error { | |
return nil | ||
} | ||
|
||
func reportUsageReconcileStarted() { | ||
reconcileStartedTotal.WithLabelValues().Inc() | ||
func reportJobStarted(id string) { | ||
jobStartedSeconds.WithLabelValues(id).Inc() | ||
} | ||
|
||
func reportUsageReconcileFinished(duration time.Duration, err error) { | ||
func reportJobCompleted(id string, duration time.Duration, err error) { | ||
outcome := "success" | ||
if err != nil { | ||
outcome = "error" | ||
} | ||
reconcileStartedDurationSeconds.WithLabelValues(outcome).Observe(duration.Seconds()) | ||
jobCompletedSeconds.WithLabelValues(id, outcome).Observe(duration.Seconds()) | ||
} |
Uh oh!
There was an error while loading. Please reload this page.