Skip to content

Commit 8ec426f

Browse files
committed
Implement BadgerGC with controller runtime
Signed-off-by: leigh capili <[email protected]>
1 parent 2d0f02a commit 8ec426f

File tree

4 files changed

+190
-2
lines changed

4 files changed

+190
-2
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ require (
1717
github.com/fluxcd/pkg/cache v0.9.0
1818
github.com/fluxcd/pkg/runtime v0.60.0
1919
github.com/fluxcd/pkg/version v0.7.0
20+
github.com/go-logr/logr v1.4.2
2021
github.com/google/go-containerregistry v0.20.5
2122
github.com/google/go-containerregistry/pkg/authn/k8schain v0.0.0-20250225234217-098045d5e61f
2223
github.com/onsi/ginkgo v1.16.5
@@ -86,7 +87,6 @@ require (
8687
github.com/fsnotify/fsnotify v1.9.0 // indirect
8788
github.com/fxamacker/cbor/v2 v2.8.0 // indirect
8889
github.com/go-errors/errors v1.5.1 // indirect
89-
github.com/go-logr/logr v1.4.2 // indirect
9090
github.com/go-logr/zapr v1.3.0 // indirect
9191
github.com/go-openapi/jsonpointer v0.21.1 // indirect
9292
github.com/go-openapi/jsonreference v0.21.0 // indirect

internal/database/badger_gc.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
Copyright 2025 The Flux authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package database
17+
18+
import (
19+
"context"
20+
"errors"
21+
"time"
22+
23+
"github.com/dgraph-io/badger/v3"
24+
"github.com/go-logr/logr"
25+
ctrl "sigs.k8s.io/controller-runtime"
26+
)
27+
28+
// BadgerGarbageCollector implements controller runtime's Runnable
29+
type BadgerGarbageCollector struct {
30+
// DiscardRatio must be a float between 0.0 and 1.0, inclusive
31+
// See badger.DB.RunValueLogGC for more info
32+
DiscardRatio float64
33+
Interval time.Duration
34+
35+
name string
36+
db *badger.DB
37+
log logr.Logger
38+
}
39+
40+
// NewBadgerGarbageCollector creates and returns a new BadgerGarbageCollector
41+
func NewBadgerGarbageCollector(name string, db *badger.DB, interval time.Duration, discardRatio float64) *BadgerGarbageCollector {
42+
return &BadgerGarbageCollector{
43+
DiscardRatio: discardRatio,
44+
Interval: interval,
45+
46+
name: name,
47+
db: db,
48+
}
49+
}
50+
51+
// Start repeatedly runs the BadgerDB garbage collector with a delay inbetween
52+
// runs.
53+
//
54+
// Start blocks until the context is cancelled. The database is expected to
55+
// already be open and not be closed while this context is active.
56+
//
57+
// ctx should be a logr.Logger context.
58+
func (gc *BadgerGarbageCollector) Start(ctx context.Context) error {
59+
gc.log = ctrl.LoggerFrom(ctx).WithName(gc.name)
60+
61+
gc.log.Info("Starting Badger GC")
62+
timer := time.NewTimer(gc.Interval)
63+
for {
64+
select {
65+
case <-timer.C:
66+
gc.discardValueLogFiles()
67+
timer.Reset(gc.Interval)
68+
case <-ctx.Done():
69+
timer.Stop()
70+
gc.log.Info("Stopped Badger GC")
71+
return nil
72+
}
73+
}
74+
}
75+
76+
// upper bound for loop
77+
const maxDiscards = 1000
78+
79+
func (gc *BadgerGarbageCollector) discardValueLogFiles() {
80+
gc.log.V(1).Info("Running Badger GC")
81+
for c := 0; c < maxDiscards; c++ {
82+
err := gc.db.RunValueLogGC(gc.DiscardRatio)
83+
if errors.Is(err, badger.ErrNoRewrite) {
84+
// there is no more garbage to discard
85+
gc.log.V(1).Info("Ran Badger GC", "discarded_vlogs", c)
86+
return
87+
}
88+
if err != nil {
89+
gc.log.Error(err, "Badger GC Error", "discarded_vlogs", c)
90+
return
91+
}
92+
}
93+
gc.log.Error(nil, "Warning: Badger GC ran for maximum discards", "discarded_vlogs", maxDiscards)
94+
}

internal/database/badger_gc_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
Copyright 2020 The Flux authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package database
17+
18+
import (
19+
"context"
20+
"os"
21+
"testing"
22+
"time"
23+
24+
"github.com/dgraph-io/badger/v3"
25+
"github.com/go-logr/logr"
26+
"github.com/go-logr/logr/testr"
27+
)
28+
29+
func TestBadgerGarbageCollectorDoesStop(t *testing.T) {
30+
badger, db := createBadgerDatabaseForGC(t)
31+
ctx, cancel := context.WithCancel(
32+
logr.NewContext(context.Background(),
33+
testr.NewWithOptions(t, testr.Options{Verbosity: 1, LogTimestamp: true})))
34+
35+
stop := make(chan struct{})
36+
go func() {
37+
gc := NewBadgerGarbageCollector("test-badger-gc", badger, 500*time.Millisecond, 0.01)
38+
gc.Start(ctx)
39+
stop <- struct{}{}
40+
}()
41+
42+
time.Sleep(time.Second)
43+
44+
tags := []string{"latest", "v0.0.1", "v0.0.2"}
45+
fatalIfError(t, db.SetTags(testRepo, tags))
46+
_, err := db.Tags(testRepo)
47+
fatalIfError(t, err)
48+
t.Log("wrote tags successfully")
49+
50+
time.Sleep(time.Second)
51+
52+
cancel()
53+
t.Log("waiting for GC stop")
54+
select {
55+
case <-time.NewTimer(5 * time.Second).C:
56+
t.Fatalf("GC did not stop")
57+
case <-stop:
58+
t.Log("GC Stopped")
59+
}
60+
}
61+
62+
func createBadgerDatabaseForGC(t *testing.T) (*badger.DB, *BadgerDatabase) {
63+
t.Helper()
64+
dir, err := os.MkdirTemp(os.TempDir(), t.Name())
65+
if err != nil {
66+
t.Fatal(err)
67+
}
68+
db, err := badger.Open(badger.DefaultOptions(dir))
69+
if err != nil {
70+
t.Fatal(err)
71+
}
72+
t.Cleanup(func() {
73+
db.Close()
74+
os.RemoveAll(dir)
75+
})
76+
return db, NewBadgerDatabase(db)
77+
}

main.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"errors"
2121
"fmt"
2222
"os"
23+
"time"
2324

2425
"github.com/dgraph-io/badger/v3"
2526
flag "github.com/spf13/pflag"
@@ -61,7 +62,10 @@ import (
6162
"github.com/fluxcd/image-reflector-controller/internal/registry"
6263
)
6364

64-
const controllerName = "image-reflector-controller"
65+
const (
66+
controllerName = "image-reflector-controller"
67+
discardRatio = 0.7
68+
)
6569

6670
var (
6771
scheme = runtime.NewScheme()
@@ -90,6 +94,7 @@ func main() {
9094
watchOptions helper.WatchOptions
9195
storagePath string
9296
storageValueLogFileSize int64
97+
gcInterval uint16 // max value is 65535 minutes (~ 45 days) which is well under the maximum time.Duration
9398
concurrent int
9499
awsAutoLogin bool
95100
gcpAutoLogin bool
@@ -105,6 +110,7 @@ func main() {
105110
flag.StringVar(&healthAddr, "health-addr", ":9440", "The address the health endpoint binds to.")
106111
flag.StringVar(&storagePath, "storage-path", "/data", "Where to store the persistent database of image metadata")
107112
flag.Int64Var(&storageValueLogFileSize, "storage-value-log-file-size", 1<<28, "Set the database's memory mapped value log file size in bytes. Effective memory usage is about two times this size.")
113+
flag.Uint16Var(&gcInterval, "gc-interval", 10, "The number of minutes to wait between garbage collections. 0 disables the garbage collector.")
108114
flag.IntVar(&concurrent, "concurrent", 4, "The number of concurrent resource reconciles.")
109115

110116
// NOTE: Deprecated flags.
@@ -152,7 +158,14 @@ func main() {
152158
os.Exit(1)
153159
}
154160
defer badgerDB.Close()
161+
155162
db := database.NewBadgerDatabase(badgerDB)
163+
var badgerGC *database.BadgerGarbageCollector
164+
if gcInterval > 0 {
165+
badgerGC = database.NewBadgerGarbageCollector("badger-gc", badgerDB, time.Duration(gcInterval)*time.Minute, discardRatio)
166+
} else {
167+
setupLog.V(1).Info("Badger garbage collector is disabled")
168+
}
156169

157170
watchNamespace := ""
158171
if !watchOptions.AllNamespaces {
@@ -225,6 +238,10 @@ func main() {
225238
os.Exit(1)
226239
}
227240

241+
if badgerGC != nil {
242+
mgr.Add(badgerGC)
243+
}
244+
228245
probes.SetupChecks(mgr, setupLog)
229246

230247
var eventRecorder *events.Recorder

0 commit comments

Comments
 (0)