Skip to content

[Access] Add support for multi-store reading of collections & transactions in Access Nodes #7321

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ import (
"github.com/onflow/flow-go/state/protocol/blocktimer"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/dbops"
"github.com/onflow/flow-go/storage/operation"
"github.com/onflow/flow-go/storage/operation/badgerimpl"
pstorage "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/storage/store"
"github.com/onflow/flow-go/utils/grpcutils"
Expand Down Expand Up @@ -579,12 +582,21 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
return stateSyncCommands.NewReadExecutionDataCommand(builder.ExecutionDataStore)
}).
Module("transactions and collections storage", func(node *cmd.NodeConfig) error {
// TODO: needs to be wrapped with ChainedCollections module, otherwise once we switch
// ProtocolDB to pebble based storage, the data previously stored in badger will not be
// accessable.
transactions := store.NewTransactions(node.Metrics.Cache, node.ProtocolDB)
builder.collections = store.NewCollections(node.ProtocolDB, transactions)

dbStore := node.ProtocolDB

if dbops.IsPebbleBatch(node.DBOps) {
// Create multiDBStore with node.ProtocolDB as primary read-and-write-store,
// and node.DB as secondary read-only store.
badgerDB := badgerimpl.ToDB(node.DB)
dbStore = operation.NewMultiDBStore(node.ProtocolDB, badgerDB)
}

transactions := store.NewTransactions(node.Metrics.Cache, dbStore)
collections := store.NewCollections(dbStore, transactions)
builder.transactions = transactions
builder.collections = collections

return nil
}).
Module("execution data datastore and blobstore", func(node *cmd.NodeConfig) error {
Expand Down
11 changes: 7 additions & 4 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,11 @@ func (exeNode *ExecutionNode) LoadExecutionMetrics(node *NodeConfig) error {
// this is guaranteed to exist because LoadBootstrapper has inserted
// the root block as executed block
var blockID flow.Identifier
reader := node.ProtocolDB.Reader()
err := operation.RetrieveExecutedBlock(reader, &blockID)
reader, err := node.ProtocolDB.Reader()
if err != nil {
return err
}
err = operation.RetrieveExecutedBlock(reader, &blockID)
if err != nil {
// database has not been bootstrapped yet
if errors.Is(err, storageerr.ErrNotFound) {
Expand Down Expand Up @@ -348,13 +351,13 @@ func (exeNode *ExecutionNode) LoadExecutionStorage(
exeNode.myReceipts = store.NewMyExecutionReceipts(node.Metrics.Cache, db, exeNode.receipts)
exeNode.txResults = store.NewTransactionResults(node.Metrics.Cache, db, exeNode.exeConf.transactionResultsCacheSize)

if dbops.IsBadgerBased(node.dbops) {
if dbops.IsBadgerBased(node.DBOps) {
// if data are stored in badger, we can use the same storage for all data
exeNode.eventsReader = exeNode.events
exeNode.commitsReader = exeNode.commits
exeNode.resultsReader = exeNode.results
exeNode.txResultsReader = exeNode.txResults
} else if dbops.IsPebbleBatch(node.dbops) {
} else if dbops.IsPebbleBatch(node.DBOps) {
// when data are stored in pebble, we need to use chained storage to query data from
// both pebble and badger
// note the pebble storage is the first argument, and badger storage is the second, so
Expand Down
4 changes: 2 additions & 2 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ type BaseConfig struct {
datadir string
pebbleDir string
pebbleCheckpointsDir string
dbops string
DBOps string
badgerDB *badger.DB
pebbleDB *pebble.DB
secretsdir string
Expand Down Expand Up @@ -280,7 +280,7 @@ func DefaultBaseConfig() *BaseConfig {
BootstrapDir: "bootstrap",
datadir: datadir,
pebbleDir: pebbleDir,
dbops: string(dbops.BadgerTransaction), // "badger-transaction" (default) or "batch-update"
DBOps: string(dbops.BadgerTransaction), // "badger-transaction" (default) or "batch-update"
badgerDB: nil,
pebbleDB: nil,
secretsdir: NotSet,
Expand Down
8 changes: 4 additions & 4 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (fnb *FlowNodeBuilder) BaseFlags() {
fnb.flags.StringVar(&fnb.BaseConfig.pebbleCheckpointsDir, "pebble-checkpoints-dir", defaultConfig.pebbleCheckpointsDir, "directory to store the checkpoints for the public pebble database (protocol state)")
fnb.flags.StringVar(&fnb.BaseConfig.pebbleDir, "pebble-dir", defaultConfig.pebbleDir, "directory to store the public pebble database (protocol state)")
fnb.flags.StringVar(&fnb.BaseConfig.secretsdir, "secretsdir", defaultConfig.secretsdir, "directory to store private database (secrets)")
fnb.flags.StringVar(&fnb.BaseConfig.dbops, "dbops", defaultConfig.dbops, "database operations to use (badger-transaction, batch-update, pebble-update)")
fnb.flags.StringVar(&fnb.BaseConfig.DBOps, "dbops", defaultConfig.DBOps, "database operations to use (badger-transaction, batch-update, pebble-update)")
fnb.flags.StringVarP(&fnb.BaseConfig.level, "loglevel", "l", defaultConfig.level, "level for logging output")
fnb.flags.Uint32Var(&fnb.BaseConfig.debugLogLimit, "debug-log-limit", defaultConfig.debugLogLimit, "max number of debug/trace log events per second")
fnb.flags.UintVarP(&fnb.BaseConfig.metricsPort, "metricport", "m", defaultConfig.metricsPort, "port for /metrics endpoint")
Expand Down Expand Up @@ -1156,14 +1156,14 @@ func (fnb *FlowNodeBuilder) initPebbleDB() error {

// create protocol db according to the badger or pebble db
func (fnb *FlowNodeBuilder) initProtocolDB(bdb *badger.DB, pdb *pebble.DB) error {
if dbops.IsBadgerBased(fnb.dbops) {
if dbops.IsBadgerBased(fnb.DBOps) {
fnb.ProtocolDB = badgerimpl.ToDB(bdb)
fnb.Logger.Info().Msg("initProtocolDB: using badger protocol db")
} else if dbops.IsPebbleBatch(fnb.dbops) {
} else if dbops.IsPebbleBatch(fnb.DBOps) {
fnb.ProtocolDB = pebbleimpl.ToDB(pdb)
fnb.Logger.Info().Msgf("initProtocolDB: using pebble protocol db")
} else {
return fmt.Errorf(dbops.UsageErrMsg, fnb.dbops)
return fmt.Errorf(dbops.UsageErrMsg, fnb.DBOps)
}
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,13 @@ func ExportDeltaSnapshots(blockID flow.Identifier, dbPath string, outputPath str
return nil
}

reader, err := sdb.Reader()
if err != nil {
return err
}

var snap []*snapshot.ExecutionSnapshot
err = operation.RetrieveExecutionStateInteractions(sdb.Reader(), activeBlockID, &snap)
err = operation.RetrieveExecutionStateInteractions(reader, activeBlockID, &snap)
if err != nil {
return fmt.Errorf("could not load delta snapshot: %w", err)
}
Expand Down
8 changes: 7 additions & 1 deletion cmd/util/cmd/read-badger/cmd/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ var statsCmd = &cobra.Command{
numWorkers = 256
}
log.Info().Msgf("getting stats for %s db at %s with %v workers0", flagDBType, flagDatadir, numWorkers)
stats, err := operation.SummarizeKeysByFirstByteConcurrent(log.Logger, sdb.Reader(), numWorkers)

reader, err := sdb.Reader()
if err != nil {
return fmt.Errorf("failed to get reader: %w", err)
}

stats, err := operation.SummarizeKeysByFirstByteConcurrent(log.Logger, reader, numWorkers)
if err != nil {
return fmt.Errorf("failed to get stats: %w", err)
}
Expand Down
12 changes: 6 additions & 6 deletions cmd/verification_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
var ok bool
var err error

if dbops.IsBadgerTransaction(node.dbops) {
if dbops.IsBadgerTransaction(node.DBOps) {
queue := badger.NewChunkQueue(node.DB)
ok, err = queue.Init(chunkconsumer.DefaultJobIndex)
if err != nil {
Expand All @@ -178,7 +178,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {

chunkQueue = queue
node.Logger.Info().Msgf("chunks queue index has been initialized with badger db transaction updates")
} else if dbops.IsBatchUpdate(node.dbops) {
} else if dbops.IsBatchUpdate(node.DBOps) {
queue := store.NewChunkQueue(node.Metrics.Cache, node.ProtocolDB)
ok, err = queue.Init(chunkconsumer.DefaultJobIndex)
if err != nil {
Expand All @@ -188,7 +188,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
chunkQueue = queue
node.Logger.Info().Msgf("chunks queue index has been initialized with protocol db batch updates")
} else {
return fmt.Errorf(dbops.UsageErrMsg, v.dbops)
return fmt.Errorf(dbops.UsageErrMsg, v.DBOps)
}

node.Logger.Info().
Expand Down Expand Up @@ -225,12 +225,12 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
chunkVerifier := chunks.NewChunkVerifier(vm, vmCtx, node.Logger)

var approvalStorage storage.ResultApprovals
if dbops.IsBadgerTransaction(v.dbops) {
if dbops.IsBadgerTransaction(v.DBOps) {
approvalStorage = badger.NewResultApprovals(node.Metrics.Cache, node.DB)
} else if dbops.IsBatchUpdate(v.dbops) {
} else if dbops.IsBatchUpdate(v.DBOps) {
approvalStorage = store.NewResultApprovals(node.Metrics.Cache, node.ProtocolDB)
} else {
return nil, fmt.Errorf("invalid db opts type: %v", v.dbops)
return nil, fmt.Errorf("invalid db opts type: %v", v.DBOps)
}

verifierEng, err = verifier.New(
Expand Down
7 changes: 6 additions & 1 deletion engine/execution/migration/badgerpebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,13 @@ func MigrateLastSealedExecutedResultToPebble(logger zerolog.Logger, badgerDB *ba
// create pebble storage modules
pebbleResults, pebbleCommits := createStores(pdb)

reader, err := pdb.Reader()
if err != nil {
return err
}

var existingExecuted flow.Identifier
err = operation.RetrieveExecutedBlock(pdb.Reader(), &existingExecuted)
err = operation.RetrieveExecutedBlock(reader, &existingExecuted)
if err == nil {
// there is an executed block in pebble, compare if it's newer than the badger one,
// if newer, it means EN is storing new results in pebble, in this case, we don't
Expand Down
7 changes: 6 additions & 1 deletion engine/execution/state/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ func (b *Bootstrapper) BootstrapLedger(
func (b *Bootstrapper) IsBootstrapped(db storage.DB) (flow.StateCommitment, bool, error) {
var commit flow.StateCommitment

err := operation.LookupStateCommitment(db.Reader(), flow.ZeroID, &commit)
reader, err := db.Reader()
if err != nil {
return flow.DummyStateCommitment, false, err
}

err = operation.LookupStateCommitment(reader, flow.ZeroID, &commit)
if errors.Is(err, storage.ErrNotFound) {
return flow.DummyStateCommitment, false, nil
}
Expand Down
7 changes: 6 additions & 1 deletion engine/execution/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,13 @@ func (s *state) GetLastExecutedBlockID(ctx context.Context) (uint64, flow.Identi
return height, finalizedID, nil
}

reader, err := s.db.Reader()
if err != nil {
return 0, flow.ZeroID, err
}

var blockID flow.Identifier
err := operation.RetrieveExecutedBlock(s.db.Reader(), &blockID)
err = operation.RetrieveExecutedBlock(reader, &blockID)
if err != nil {
return 0, flow.ZeroID, err
}
Expand Down
21 changes: 17 additions & 4 deletions module/block_iterator/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,12 @@ func TestExecute(t *testing.T) {

// expect all blocks are pruned
for _, b := range bs {
reader, err := pdb.Reader()
require.NoError(t, err)

// verify they are pruned
var c storage.StoredChunkDataPack
err := operation.RetrieveChunkDataPack(pdb.Reader(), b, &c)
err = operation.RetrieveChunkDataPack(reader, b, &c)
require.True(t, errors.Is(err, storage.ErrNotFound), "expected ErrNotFound but got %v", err)
}
})
Expand Down Expand Up @@ -118,18 +121,25 @@ func TestExecuteCanBeResumed(t *testing.T) {

// expect all blocks are pruned
for i, b := range bs {

// verify they are pruned
var c storage.StoredChunkDataPack

if i < 3 {
reader, err := pdb.Reader()
require.NoError(t, err)

// the first 3 blocks in the first batch are pruned
err := operation.RetrieveChunkDataPack(pdb.Reader(), b, &c)
err = operation.RetrieveChunkDataPack(reader, b, &c)
require.True(t, errors.Is(err, storage.ErrNotFound), "expected ErrNotFound for block %v but got %v", i, err)
continue
}

// verify the remaining blocks are not pruned yet
require.NoError(t, operation.RetrieveChunkDataPack(pdb.Reader(), b, &c))
reader, err := pdb.Reader()
require.NoError(t, err)

require.NoError(t, operation.RetrieveChunkDataPack(reader, b, &c))
}

// now resume the pruning
Expand All @@ -146,9 +156,12 @@ func TestExecuteCanBeResumed(t *testing.T) {

// verify all blocks are pruned
for _, b := range bs {
reader, err := pdb.Reader()
require.NoError(t, err)

var c storage.StoredChunkDataPack
// the first 5 blocks are pruned
err := operation.RetrieveChunkDataPack(pdb.Reader(), b, &c)
err = operation.RetrieveChunkDataPack(reader, b, &c)
require.True(t, errors.Is(err, storage.ErrNotFound), "expected ErrNotFound but got %v", err)
}
})
Expand Down
7 changes: 6 additions & 1 deletion module/block_iterator/latest/sealed_and_executed.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@ func LatestSealedAndExecutedHeight(state protocol.State, db storage.DB) (uint64,
return 0, err
}

reader, err := db.Reader()
if err != nil {
return 0, err
}

var blockID flow.Identifier
err = operation.RetrieveExecutedBlock(db.Reader(), &blockID)
err = operation.RetrieveExecutedBlock(reader, &blockID)
if err != nil {
return 0, err
}
Expand Down
14 changes: 12 additions & 2 deletions storage/mock/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions storage/operation/badgerimpl/dbstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type dbStore struct {

var _ (storage.DB) = (*dbStore)(nil)

func (b *dbStore) Reader() storage.Reader {
return dbReader{db: b.db}
func (b *dbStore) Reader() (storage.Reader, error) {
return dbReader{db: b.db}, nil
}

func (b *dbStore) WithReaderBatchWriter(fn func(storage.ReaderBatchWriter) error) error {
Expand Down
15 changes: 12 additions & 3 deletions storage/operation/chunk_data_packs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ func TestChunkDataPack(t *testing.T) {
}

t.Run("Retrieve non-existent", func(t *testing.T) {
reader, err := db.Reader()
require.NoError(t, err)

var actual storage.StoredChunkDataPack
err := operation.RetrieveChunkDataPack(db.Reader(), expected.ChunkID, &actual)
err = operation.RetrieveChunkDataPack(reader, expected.ChunkID, &actual)
assert.Error(t, err)
})

Expand All @@ -34,8 +37,11 @@ func TestChunkDataPack(t *testing.T) {
})
require.NoError(t, err)

reader, err := db.Reader()
require.NoError(t, err)

var actual storage.StoredChunkDataPack
err = operation.RetrieveChunkDataPack(db.Reader(), expected.ChunkID, &actual)
err = operation.RetrieveChunkDataPack(reader, expected.ChunkID, &actual)
assert.NoError(t, err)

assert.Equal(t, *expected, actual)
Expand All @@ -47,8 +53,11 @@ func TestChunkDataPack(t *testing.T) {
})
require.NoError(t, err)

reader, err := db.Reader()
require.NoError(t, err)

var actual storage.StoredChunkDataPack
err = operation.RetrieveChunkDataPack(db.Reader(), expected.ChunkID, &actual)
err = operation.RetrieveChunkDataPack(reader, expected.ChunkID, &actual)
assert.Error(t, err)
})
})
Expand Down
Loading
Loading