Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 40 additions & 9 deletions core/state/pruner/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
package pruner

import (
"bufio"
"encoding/binary"
"errors"
"fmt"
"io"
"os"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
bloomfilter "github.com/holiman/bloomfilter/v2"
)

Expand Down Expand Up @@ -73,27 +77,54 @@ func newStateBloomWithSize(size uint64) (*stateBloom, error) {

// NewStateBloomFromDisk loads the state bloom from the given file.
// In this case the assumption is held the bloom filter is complete.
func NewStateBloomFromDisk(filename string) (*stateBloom, error) {
bloom, _, err := bloomfilter.ReadFile(filename)
func NewStateBloomFromDisk(filename string) (*stateBloom, []common.Hash, error) {
f, err := os.Open(filename)
if err != nil {
return nil, err
return nil, nil, err
}
return &stateBloom{bloom: bloom}, nil
defer f.Close()
r := bufio.NewReader(f)
version := []byte{0}
_, err = io.ReadFull(r, version)
if err != nil {
return nil, nil, err
}
if version[0] != 0 {
return nil, nil, fmt.Errorf("unknown state bloom filter version %v", version[0])
}
var roots []common.Hash
err = rlp.Decode(r, &roots)
if err != nil {
return nil, nil, err
}
bloom, _, err := bloomfilter.ReadFrom(r)
if err != nil {
return nil, nil, err
}
return &stateBloom{bloom: bloom}, roots, nil
}

// Commit flushes the bloom filter content into the disk and marks the bloom
// as complete.
func (bloom *stateBloom) Commit(filename, tempname string) error {
// Write the bloom out into a temporary file
_, err := bloom.bloom.WriteFile(tempname)
func (bloom *stateBloom) Commit(filename, tempname string, roots []common.Hash) error {
f, err := os.OpenFile(tempname, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return err
}
// Ensure the file is synced to disk
f, err := os.OpenFile(tempname, os.O_RDWR, 0666)
_, err = f.Write([]byte{0}) // version
if err != nil {
return err
}
err = rlp.Encode(f, roots)
if err != nil {
return err
}
// Write the bloom out into a temporary file
_, err = bloom.bloom.WriteTo(f)
if err != nil {
return err
}
// Ensure the file is synced to disk
if err := f.Sync(); err != nil {
f.Close()
return err
Expand Down
75 changes: 23 additions & 52 deletions core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"time"

Expand All @@ -43,11 +42,8 @@ import (
)

const (
// stateBloomFilePrefix is the filename prefix of state bloom filter.
stateBloomFilePrefix = "statebloom"

// stateBloomFilePrefix is the filename suffix of state bloom filter.
stateBloomFileSuffix = "bf.gz"
// stateBloomFileName is the filename of state bloom filter.
stateBloomFileName = "statebloom.bf.gz"

// stateBloomFileTempSuffix is the filename suffix of state bloom filter
// while it is being written out to detect write aborts.
Expand Down Expand Up @@ -439,11 +435,11 @@ func (p *Pruner) Prune(inputRoots []common.Hash) error {
// reuse it for pruning instead of generating a new one. It's
// mandatory because a part of state may already be deleted,
// the recovery procedure is necessary.
_, stateBloomRoots, err := findBloomFilter(p.datadir)
bloomExists, err := bloomFilterExists(p.datadir)
if err != nil {
return err
}
if len(stateBloomRoots) > 0 {
if bloomExists {
return RecoverPruning(p.datadir, p.db, p.trieCachePath)
}
// Retrieve all snapshot layers from the current HEAD.
Expand Down Expand Up @@ -525,13 +521,13 @@ func (p *Pruner) Prune(inputRoots []common.Hash) error {
if err := extractGenesis(p.db, p.stateBloom); err != nil {
return err
}
filterName := bloomFilterName(p.datadir, roots)
filterName := bloomFilterPath(p.datadir)

log.Info("Writing state bloom to disk", "name", filterName)
if err := p.stateBloom.Commit(filterName, filterName+stateBloomFileTempSuffix); err != nil {
log.Info("Writing state bloom to disk", "name", filterName, "roots", roots)
if err := p.stateBloom.Commit(filterName, filterName+stateBloomFileTempSuffix, roots); err != nil {
return err
}
log.Info("State bloom filter committed", "name", filterName)
log.Info("State bloom filter committed", "name", filterName, "roots", roots)
return prune(p.snaptree, roots, p.db, p.stateBloom, filterName, start)
}

Expand All @@ -543,11 +539,11 @@ func (p *Pruner) Prune(inputRoots []common.Hash) error {
// pruning **has to be resumed**. Otherwise a lot of dangling nodes may be left
// in the disk.
func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string) error {
stateBloomPath, stateBloomRoots, err := findBloomFilter(datadir)
exists, err := bloomFilterExists(datadir)
if err != nil {
return err
}
if stateBloomPath == "" {
if !exists {
return nil // nothing to recover
}
headBlock := rawdb.ReadHeadBlock(db)
Expand All @@ -566,11 +562,12 @@ func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string) err
if err != nil {
return err // The relevant snapshot(s) might not exist
}
stateBloom, err := NewStateBloomFromDisk(stateBloomPath)
stateBloomPath := bloomFilterPath(datadir)
stateBloom, stateBloomRoots, err := NewStateBloomFromDisk(stateBloomPath)
if err != nil {
return err
}
log.Info("Loaded state bloom filter", "path", stateBloomPath)
log.Info("Loaded state bloom filter", "path", stateBloomPath, "roots", stateBloomRoots)

// Before start the pruning, delete the clean trie cache first.
// It's necessary otherwise in the next restart we will hit the
Expand All @@ -595,45 +592,19 @@ func extractGenesis(db ethdb.Database, stateBloom *stateBloom) error {
return dumpRawTrieDescendants(db, genesis.Root(), stateBloom)
}

func bloomFilterName(datadir string, hashes []common.Hash) string {
path := filepath.Join(datadir, stateBloomFilePrefix)
for _, hash := range hashes {
path += "." + hash.String()
}
return path + "." + stateBloomFileSuffix
}

func isBloomFilter(filename string) (bool, []common.Hash) {
filename = filepath.Base(filename)
if strings.HasPrefix(filename, stateBloomFilePrefix) && strings.HasSuffix(filename, stateBloomFileSuffix) {
parts := strings.Split(filename[len(stateBloomFilePrefix)+1:len(filename)-len(stateBloomFileSuffix)-1], ".")
var roots []common.Hash
for _, part := range parts {
roots = append(roots, common.HexToHash(part))
}
return true, roots
}
return false, nil
func bloomFilterPath(datadir string) string {
return filepath.Join(datadir, stateBloomFileName)
}

func findBloomFilter(datadir string) (string, []common.Hash, error) {
var (
stateBloomPath string
stateBloomRoots []common.Hash
)
if err := filepath.Walk(datadir, func(path string, info os.FileInfo, err error) error {
if info != nil && !info.IsDir() {
ok, roots := isBloomFilter(path)
if ok {
stateBloomPath = path
stateBloomRoots = roots
}
}
return nil
}); err != nil {
return "", nil, err
func bloomFilterExists(datadir string) (bool, error) {
_, err := os.Stat(bloomFilterPath(datadir))
if errors.Is(err, os.ErrNotExist) {
return false, nil
} else if err != nil {
return false, err
} else {
return true, nil
}
return stateBloomPath, stateBloomRoots, nil
}

const warningLog = `
Expand Down