Skip to content

channeldb: add reject and channel caches #2847

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions channeldb/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment) error {
return ErrNoRestoredChannelMutation
}

err := c.Db.Batch(func(tx *bbolt.Tx) error {
err := c.Db.Update(func(tx *bbolt.Tx) error {
chanBucket, err := fetchChanBucket(
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
)
Expand Down Expand Up @@ -1465,7 +1465,7 @@ func (c *OpenChannel) AppendRemoteCommitChain(diff *CommitDiff) error {
return ErrNoRestoredChannelMutation
}

return c.Db.Batch(func(tx *bbolt.Tx) error {
return c.Db.Update(func(tx *bbolt.Tx) error {
// First, we'll grab the writable bucket where this channel's
// data resides.
chanBucket, err := fetchChanBucket(
Expand Down Expand Up @@ -1608,9 +1608,7 @@ func (c *OpenChannel) AdvanceCommitChainTail(fwdPkg *FwdPkg) error {

var newRemoteCommit *ChannelCommitment

err := c.Db.Batch(func(tx *bbolt.Tx) error {
newRemoteCommit = nil

err := c.Db.Update(func(tx *bbolt.Tx) error {
chanBucket, err := fetchChanBucket(
tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
)
Expand Down Expand Up @@ -1748,7 +1746,7 @@ func (c *OpenChannel) AckAddHtlcs(addRefs ...AddRef) error {
c.Lock()
defer c.Unlock()

return c.Db.Batch(func(tx *bbolt.Tx) error {
return c.Db.Update(func(tx *bbolt.Tx) error {
return c.Packager.AckAddHtlcs(tx, addRefs...)
})
}
Expand All @@ -1761,7 +1759,7 @@ func (c *OpenChannel) AckSettleFails(settleFailRefs ...SettleFailRef) error {
c.Lock()
defer c.Unlock()

return c.Db.Batch(func(tx *bbolt.Tx) error {
return c.Db.Update(func(tx *bbolt.Tx) error {
return c.Packager.AckSettleFails(tx, settleFailRefs...)
})
}
Expand All @@ -1772,7 +1770,7 @@ func (c *OpenChannel) SetFwdFilter(height uint64, fwdFilter *PkgFilter) error {
c.Lock()
defer c.Unlock()

return c.Db.Batch(func(tx *bbolt.Tx) error {
return c.Db.Update(func(tx *bbolt.Tx) error {
return c.Packager.SetFwdFilter(tx, height, fwdFilter)
})
}
Expand All @@ -1785,15 +1783,14 @@ func (c *OpenChannel) RemoveFwdPkg(height uint64) error {
c.Lock()
defer c.Unlock()

return c.Db.Batch(func(tx *bbolt.Tx) error {
return c.Db.Update(func(tx *bbolt.Tx) error {
return c.Packager.RemovePkg(tx, height)
})
}

// RevocationLogTail returns the "tail", or the end of the current revocation
// log. This entry represents the last previous state for the remote node's
// commitment chain. The ChannelDelta returned by this method will always lag
// one state behind the most current (unrevoked) state of the remote node's
// commitment chain. The ChannelDelta returned by this method will always lag one state behind the most current (unrevoked) state of the remote node's
// commitment chain.
func (c *OpenChannel) RevocationLogTail() (*ChannelCommitment, error) {
c.RLock()
Expand Down
50 changes: 50 additions & 0 deletions channeldb/channel_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package channeldb

// channelCache is an in-memory cache used to improve the performance of
// ChanUpdatesInHorizon. It caches the chan info and edge policies for a
// particular channel.
type channelCache struct {
n int
channels map[uint64]ChannelEdge
}

// newChannelCache creates a new channelCache with maximum capacity of n
// channels.
func newChannelCache(n int) *channelCache {
return &channelCache{
n: n,
channels: make(map[uint64]ChannelEdge),
}
}

// get returns the channel from the cache, if it exists.
func (c *channelCache) get(chanid uint64) (ChannelEdge, bool) {
channel, ok := c.channels[chanid]
return channel, ok
}

// insert adds the entry to the channel cache. If an entry for chanid already
// exists, it will be replaced with the new entry. If the entry doesn't exist,
// it will be inserted to the cache, performing a random eviction if the cache
// is at capacity.
func (c *channelCache) insert(chanid uint64, channel ChannelEdge) {
// If entry exists, replace it.
if _, ok := c.channels[chanid]; ok {
c.channels[chanid] = channel
return
}

// Otherwise, evict an entry at random and insert.
if len(c.channels) == c.n {
for id := range c.channels {
delete(c.channels, id)
break
}
}
c.channels[chanid] = channel
}

// remove deletes an edge for chanid from the cache, if it exists.
func (c *channelCache) remove(chanid uint64) {
delete(c.channels, chanid)
}
105 changes: 105 additions & 0 deletions channeldb/channel_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package channeldb

import (
"reflect"
"testing"
)

// TestChannelCache checks the behavior of the channelCache with respect to
// insertion, eviction, and removal of cache entries.
func TestChannelCache(t *testing.T) {
const cacheSize = 100

// Create a new channel cache with the configured max size.
c := newChannelCache(cacheSize)

// As a sanity check, assert that querying the empty cache does not
// return an entry.
_, ok := c.get(0)
if ok {
t.Fatalf("channel cache should be empty")
}

// Now, fill up the cache entirely.
for i := uint64(0); i < cacheSize; i++ {
c.insert(i, channelForInt(i))
}

// Assert that the cache has all of the entries just inserted, since no
// eviction should occur until we try to surpass the max size.
assertHasChanEntries(t, c, 0, cacheSize)

// Now, insert a new element that causes the cache to evict an element.
c.insert(cacheSize, channelForInt(cacheSize))

// Assert that the cache has this last entry, as the cache should evict
// some prior element and not the newly inserted one.
assertHasChanEntries(t, c, cacheSize, cacheSize)

// Iterate over all inserted elements and construct a set of the evicted
// elements.
evicted := make(map[uint64]struct{})
for i := uint64(0); i < cacheSize+1; i++ {
_, ok := c.get(i)
if !ok {
evicted[i] = struct{}{}
}
}

// Assert that exactly one element has been evicted.
numEvicted := len(evicted)
if numEvicted != 1 {
t.Fatalf("expected one evicted entry, got: %d", numEvicted)
}

// Remove the highest item which initially caused the eviction and
// reinsert the element that was evicted prior.
c.remove(cacheSize)
for i := range evicted {
c.insert(i, channelForInt(i))
}

// Since the removal created an extra slot, the last insertion should
// not have caused an eviction and the entries for all channels in the
// original set that filled the cache should be present.
assertHasChanEntries(t, c, 0, cacheSize)

// Finally, reinsert the existing set back into the cache and test that
// the cache still has all the entries. If the randomized eviction were
// happening on inserts for existing cache items, we expect this to fail
// with high probability.
for i := uint64(0); i < cacheSize; i++ {
c.insert(i, channelForInt(i))
}
assertHasChanEntries(t, c, 0, cacheSize)

}

// assertHasEntries queries the edge cache for all channels in the range [start,
// end), asserting that they exist and their value matches the entry produced by
// entryForInt.
func assertHasChanEntries(t *testing.T, c *channelCache, start, end uint64) {
t.Helper()

for i := start; i < end; i++ {
entry, ok := c.get(i)
if !ok {
t.Fatalf("channel cache should contain chan %d", i)
}

expEntry := channelForInt(i)
if !reflect.DeepEqual(entry, expEntry) {
t.Fatalf("entry mismatch, want: %v, got: %v",
expEntry, entry)
}
}
}

// channelForInt generates a unique ChannelEdge given an integer.
func channelForInt(i uint64) ChannelEdge {
return ChannelEdge{
Info: &ChannelEdgeInfo{
ChannelID: i,
},
}
}
41 changes: 31 additions & 10 deletions channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net"
"os"
"path/filepath"
"sync"
"time"

"github.com/btcsuite/btcd/btcec"
Expand Down Expand Up @@ -104,21 +103,18 @@ var (
byteOrder = binary.BigEndian
)

var bufPool = &sync.Pool{
New: func() interface{} { return new(bytes.Buffer) },
}

// DB is the primary datastore for the lnd daemon. The database stores
// information related to nodes, routing data, open/closed channels, fee
// schedules, and reputation data.
type DB struct {
*bbolt.DB
dbPath string
graph *ChannelGraph
}

// Open opens an existing channeldb. Any necessary schemas migrations due to
// updates will take place as necessary.
func Open(dbPath string) (*DB, error) {
func Open(dbPath string, modifiers ...OptionModifier) (*DB, error) {
path := filepath.Join(dbPath, dbName)

if !fileExists(path) {
Expand All @@ -127,6 +123,11 @@ func Open(dbPath string) (*DB, error) {
}
}

opts := DefaultOptions()
for _, modifier := range modifiers {
modifier(&opts)
}

bdb, err := bbolt.Open(path, dbFilePermission, nil)
if err != nil {
return nil, err
Expand All @@ -136,6 +137,9 @@ func Open(dbPath string) (*DB, error) {
DB: bdb,
dbPath: dbPath,
}
chanDB.graph = newChannelGraph(
chanDB, opts.RejectCacheSize, opts.ChannelCacheSize,
)

// Synchronize the version of database and apply migrations if needed.
if err := chanDB.syncVersions(dbVersions); err != nil {
Expand Down Expand Up @@ -900,9 +904,14 @@ type ChannelShell struct {
// well. This method is idempotent, so repeated calls with the same set of
// channel shells won't modify the database after the initial call.
func (d *DB) RestoreChannelShells(channelShells ...*ChannelShell) error {
chanGraph := ChannelGraph{d}
chanGraph := d.ChannelGraph()

return d.Update(func(tx *bbolt.Tx) error {
// TODO(conner): find way to do this w/o accessing internal members?
chanGraph.cacheMu.Lock()
defer chanGraph.cacheMu.Unlock()

var chansRestored []uint64
err := d.Update(func(tx *bbolt.Tx) error {
for _, channelShell := range channelShells {
channel := channelShell.Chan

Expand Down Expand Up @@ -983,14 +992,26 @@ func (d *DB) RestoreChannelShells(channelShells ...*ChannelShell) error {
chanEdge.ChannelFlags |= lnwire.ChanUpdateDirection
}

err = updateEdgePolicy(tx, &chanEdge)
_, err = updateEdgePolicy(tx, &chanEdge)
if err != nil {
return err
}

chansRestored = append(chansRestored, edgeInfo.ChannelID)
}

return nil
})
if err != nil {
return err
}

for _, chanid := range chansRestored {
chanGraph.rejectCache.remove(chanid)
chanGraph.chanCache.remove(chanid)
}

return nil
}

// AddrsForNode consults the graph and channel database for all addresses known
Expand Down Expand Up @@ -1112,7 +1133,7 @@ func (d *DB) syncVersions(versions []version) error {

// ChannelGraph returns a new instance of the directed channel graph.
func (d *DB) ChannelGraph() *ChannelGraph {
return &ChannelGraph{d}
return d.graph
}

func getLatestDBVersion(versions []version) uint32 {
Expand Down
Loading