Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
549eb3c
Add redis cache.
mask-pp Jan 11, 2023
d907513
Merge branch 'staging' into cache_redis
mask-pp Jan 13, 2023
4d87025
upgrade struct
mask-pp Jan 13, 2023
1d95be5
go mod tidy
mask-pp Jan 13, 2023
99e86aa
Merge branch 'staging' into cache_redis
mask-pp Jan 13, 2023
a91401f
merge staging branch and fix conflict
mask-pp Jan 16, 2023
343d75f
fix ci and update dependence
mask-pp Jan 16, 2023
5daef83
Add redis docker
mask-pp Jan 16, 2023
8f36f18
change redis config
mask-pp Jan 16, 2023
98c8fbe
fix test case
mask-pp Jan 16, 2023
0ea2e43
Update dependence
mask-pp Jan 16, 2023
6f11ca8
Update version
mask-pp Jan 16, 2023
9adaac9
fix ci
mask-pp Jan 16, 2023
c15e267
fix ci
mask-pp Jan 16, 2023
7d06a5d
fix ci
mask-pp Jan 16, 2023
dcbcb3f
fix ci
mask-pp Jan 16, 2023
14c1c1d
change api
mask-pp Jan 17, 2023
fdc1a70
change api
mask-pp Jan 17, 2023
886ddf2
change config
mask-pp Jan 17, 2023
e08e1e7
Update database/orm_test.go
mask-pp Jan 17, 2023
8800061
change config
mask-pp Jan 17, 2023
8b4ded0
change config
mask-pp Jan 17, 2023
946bdc7
change config
mask-pp Jan 17, 2023
eaf2294
change config
mask-pp Jan 17, 2023
54a6c22
fix bug
mask-pp Jan 17, 2023
4232ed7
merge staging branch and fix conflict
mask-pp Jan 17, 2023
927492c
upgrade api
mask-pp Jan 17, 2023
f84d23d
Update coordinator/manager.go
mask-pp Jan 17, 2023
27143cb
fix ci
mask-pp Jan 17, 2023
751513a
fix ci
mask-pp Jan 17, 2023
17d83bc
fix comments
mask-pp Jan 17, 2023
8dd41e8
fix comments and add init function in l2 watcher.
mask-pp Jan 18, 2023
2b5c361
Merge branch 'staging' into cache_redis
mask-pp Jan 18, 2023
2fa4d8a
fix bug
mask-pp Jan 18, 2023
dbd0b1a
Add test case in redis.
mask-pp Jan 18, 2023
d1db7d3
Apply suggestions from code review
colinlyguo Jan 18, 2023
ffadba8
fix comments
mask-pp Jan 18, 2023
d1c636e
fix comments
mask-pp Jan 18, 2023
0e80078
fix comments
mask-pp Jan 18, 2023
79ba063
Merge branch 'staging' into cache_redis
mask-pp Jan 18, 2023
456b98b
Merge branch 'staging' into cache_redis
mask-pp Jan 18, 2023
86b9e5c
Merge branch 'staging' into cache_redis
mask-pp Jan 20, 2023
0748558
Update database/config.go
mask-pp Jan 20, 2023
049cac3
Update bridge/l1/l1_test.go
mask-pp Jan 20, 2023
38e942f
fix comments
mask-pp Jan 20, 2023
a5aef78
Merge branch 'staging' into cache_redis
mask-pp Jan 20, 2023
c3fe80a
Merge branch 'staging' into cache_redis
colinlyguo Jan 20, 2023
8689c6d
trigger ci
colinlyguo Jan 20, 2023
c0b7a9b
upgrade redis cache
mask-pp Jan 27, 2023
6cf5d76
Merge branch 'staging' into cache_redis
mask-pp Jan 27, 2023
fbd9d84
upgrade redis cache
mask-pp Jan 27, 2023
b858f12
fix comments
mask-pp Jan 28, 2023
7abcdb4
fix ci
mask-pp Jan 28, 2023
b54d15f
Fill pending batch traces.
mask-pp Jan 28, 2023
5823282
Update version.go
0xmountaintop Jan 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions bridge/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,15 @@
}
},
"db_config": {
"driver_name": "postgres",
"dsn": "postgres://admin:123456@localhost/test?sslmode=disable"
"persistence": {
"driver_name": "postgres",
"dsn": "postgres://admin:123456@localhost/test?sslmode=disable"
},
"redis": {
"url": "redis://default:@localhost:6379/0",
"expirations": {
"trace": 3600
}
}
}
}
4 changes: 2 additions & 2 deletions bridge/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.18
require (
github.com/iden3/go-iden3-crypto v0.0.13
github.com/orcaman/concurrent-map v1.0.0
github.com/scroll-tech/go-ethereum v1.10.14-0.20230112091133-2891916a0f81
github.com/scroll-tech/go-ethereum v1.10.14-0.20230113082126-cdaea939622e
github.com/stretchr/testify v1.8.0
github.com/urfave/cli/v2 v2.10.2
golang.org/x/sync v0.1.0
Expand All @@ -14,7 +14,7 @@ require (

require (
github.com/btcsuite/btcd v0.20.1-beta // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set v1.8.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions bridge/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ github.com/cespare/cp v0.1.0 h1:SE+dxFebS7Iik5LK0tsi1k9ZCxEaFX4AjQmoyA+1dJk=
github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
Expand Down Expand Up @@ -350,8 +350,8 @@ github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/scroll-tech/go-ethereum v1.10.14-0.20230112091133-2891916a0f81 h1:Gm18RZ9WTR2Dupumr60E2m1Noe+l9/lITt6iRyxxZoc=
github.com/scroll-tech/go-ethereum v1.10.14-0.20230112091133-2891916a0f81/go.mod h1:jurIpDQ0hqtp9//xxeWzr8X9KMP/+TYn+vz3K1wZrv0=
github.com/scroll-tech/go-ethereum v1.10.14-0.20230113082126-cdaea939622e h1:TAqAeQiQI6b+TRyqyQ6qhizqY35LhqYe8lWhG0nNRGw=
github.com/scroll-tech/go-ethereum v1.10.14-0.20230113082126-cdaea939622e/go.mod h1:jurIpDQ0hqtp9//xxeWzr8X9KMP/+TYn+vz3K1wZrv0=
github.com/scroll-tech/zktrie v0.3.0/go.mod h1:CuJFlG1/soTJJBAySxCZgTF7oPvd5qF6utHOEciC43Q=
github.com/scroll-tech/zktrie v0.3.1 h1:HlR+fMBdjXX1/7cUMqpUgGEhGy/3vN1JpwQ0ovg/Ys8=
github.com/scroll-tech/zktrie v0.3.1/go.mod h1:CuJFlG1/soTJJBAySxCZgTF7oPvd5qF6utHOEciC43Q=
Expand Down
4 changes: 2 additions & 2 deletions bridge/l1/l1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func setupEnv(t *testing.T) {
cfg.L2Config.Endpoint = l2gethImg.Endpoint()

// Create db container.
dbImg = docker.NewTestDBDocker(t, cfg.DBConfig.DriverName)
cfg.DBConfig.DSN = dbImg.Endpoint()
dbImg = docker.NewTestDBDocker(t, cfg.DBConfig.Persistence.DriverName)
cfg.DBConfig.Persistence.DSN = dbImg.Endpoint()
}

func free(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion bridge/l1/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (w *Watcher) Stop() {

const contractEventsBlocksFetchLimit = int64(10)

// FetchContractEvent pull latest event logs from given contract address and save in DB
// FetchContractEvent pull latest event logs from given contract address and save in Persistence
func (w *Watcher) FetchContractEvent(blockHeight uint64) error {
defer func() {
log.Info("l1 watcher fetchContractEvent", "w.processedMsgHeight", w.processedMsgHeight)
Expand Down
5 changes: 4 additions & 1 deletion bridge/l2/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ func New(ctx context.Context, cfg *config.L2Config, orm database.OrmFactory) (*B
return nil, err
}

l2Watcher := NewL2WatcherClient(ctx, client, cfg.Confirmations, cfg.BatchProposerConfig, cfg.L2MessengerAddress, orm)
l2Watcher, err := NewL2WatcherClient(ctx, client, cfg.Confirmations, cfg.BatchProposerConfig, cfg.L2MessengerAddress, orm)
if err != nil {
return nil, err
}

return &Backend{
cfg: cfg,
Expand Down
12 changes: 10 additions & 2 deletions bridge/l2/l2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var (
l1gethImg docker.ImgInstance
l2gethImg docker.ImgInstance
dbImg docker.ImgInstance
redisImg docker.ImgInstance

// l2geth client
l2Cli *ethclient.Client
Expand All @@ -40,8 +41,12 @@ func setupEnv(t *testing.T) (err error) {
cfg.L2Config.Endpoint = l2gethImg.Endpoint()

// Create db container.
dbImg = docker.NewTestDBDocker(t, cfg.DBConfig.DriverName)
cfg.DBConfig.DSN = dbImg.Endpoint()
dbImg = docker.NewTestDBDocker(t, cfg.DBConfig.Persistence.DriverName)
cfg.DBConfig.Persistence.DSN = dbImg.Endpoint()

// Create redis container.
redisImg = docker.NewTestRedisDocker(t)
cfg.DBConfig.Redis.URL = redisImg.Endpoint()

// Create l2geth client.
l2Cli, err = ethclient.Dial(cfg.L2Config.Endpoint)
Expand All @@ -60,6 +65,9 @@ func free(t *testing.T) {
if l2gethImg != nil {
assert.NoError(t, l2gethImg.Stop())
}
if redisImg != nil {
assert.NoError(t, redisImg.Stop())
}
}

func TestFunction(t *testing.T) {
Expand Down
18 changes: 12 additions & 6 deletions bridge/l2/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ type WatcherClient struct {
}

// NewL2WatcherClient take a l2geth instance to generate a l2watcherclient instance
func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmations uint64, bpCfg *config.BatchProposerConfig, messengerAddress common.Address, orm database.OrmFactory) *WatcherClient {
func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmations uint64, bpCfg *config.BatchProposerConfig, messengerAddress common.Address, orm database.OrmFactory) (*WatcherClient, error) {
savedHeight, err := orm.GetLayer2LatestWatchedHeight()
if err != nil {
log.Warn("fetch height from db failed", "err", err)
savedHeight = 0
}

return &WatcherClient{
watcher := &WatcherClient{
ctx: ctx,
Client: client,
orm: orm,
Expand All @@ -72,13 +72,20 @@ func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmat
stopped: 0,
batchProposer: newBatchProposer(bpCfg, orm),
}
// Init cache, if traces in cache expired reset it.
if err = watcher.initCache(ctx); err != nil {
log.Error("failed to init cache in l2 watcher")
return nil, err
}

return watcher, nil
}

// Start the Listening process
func (w *WatcherClient) Start() {
go func() {
if reflect.ValueOf(w.orm).IsNil() {
panic("must run L2 watcher with DB")
panic("must run L2 watcher with Persistence")
}

ctx, cancel := context.WithCancel(w.ctx)
Expand Down Expand Up @@ -171,7 +178,7 @@ const blockTracesFetchLimit = uint64(10)

// try fetch missing blocks if inconsistent
func (w *WatcherClient) tryFetchRunningMissingBlocks(ctx context.Context, blockHeight uint64) {
// Get newest block in DB. must have blocks at that time.
// Get newest block in Persistence. must have blocks at that time.
// Don't use "block_trace" table "trace" column's BlockTrace.Number,
// because it might be empty if the corresponding rollup_result is finalized/finalization_skipped
heightInDB, err := w.orm.GetBlockTracesLatestHeight()
Expand Down Expand Up @@ -213,7 +220,6 @@ func (w *WatcherClient) getAndStoreBlockTraces(ctx context.Context, from, to uin
log.Info("retrieved block trace", "height", trace.Header.Number, "hash", trace.Header.Hash().String())

traces = append(traces, trace)

}
if len(traces) > 0 {
if err := w.orm.InsertBlockTraces(traces); err != nil {
Expand All @@ -226,7 +232,7 @@ func (w *WatcherClient) getAndStoreBlockTraces(ctx context.Context, from, to uin

const contractEventsBlocksFetchLimit = int64(10)

// FetchContractEvent pull latest event logs from given contract address and save in DB
// FetchContractEvent pull latest event logs from given contract address and save in Persistence
func (w *WatcherClient) FetchContractEvent(blockHeight uint64) {
defer func() {
log.Info("l2 watcher fetchContractEvent", "w.processedMsgHeight", w.processedMsgHeight)
Expand Down
16 changes: 16 additions & 0 deletions bridge/l2/watcher_api.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
package l2

import (
"context"

"github.com/scroll-tech/go-ethereum/core/types"
)

// WatcherAPI watcher api service
type WatcherAPI interface {
GetTracesByBatchIndex(ctx context.Context, index uint64) ([]*types.BlockTrace, error)
}

// GetTracesByBatchIndex get traces by batch_id.
func (w *WatcherClient) GetTracesByBatchIndex(ctx context.Context, index uint64) ([]*types.BlockTrace, error) {
id, err := w.orm.GetBatchIDByIndex(index)
if err != nil {
return nil, err
}
return w.orm.GetBlockTraces(map[string]interface{}{"batch_id": id})
}
120 changes: 120 additions & 0 deletions bridge/l2/watcher_init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package l2

import (
"context"
"fmt"
"math/big"
"runtime"

"github.com/scroll-tech/go-ethereum/log"
"golang.org/x/sync/errgroup"

"scroll-tech/database/cache"
"scroll-tech/database/orm"
)

func (w *WatcherClient) initCache(ctx context.Context) error {
var (
// Use at most half of the system threads.
parallel = (runtime.GOMAXPROCS(0) + 1) / 2
db = w.orm
)

// Fill unsigned block traces.
for {
batches, err := db.GetBlockBatches(
map[string]interface{}{"proving_status": orm.ProvingTaskUnassigned},
fmt.Sprintf("ORDER BY index ASC LIMIT %d;", parallel),
)
if err != nil {
log.Error("failed to get block batch", "err", err)
return err
}
if len(batches) == 0 {
break
}

var eg errgroup.Group
for _, batch := range batches {
batch := batch
eg.Go(func() error {
return w.fillTraceByNumber(ctx, batch.StartBlockNumber, batch.EndBlockNumber)
})
}
if err = eg.Wait(); err != nil {
return err
}
}

// Fill assigned and under proofing block traces into cache.
ids, err := w.orm.GetAssignedBatchIDs()
if err != nil {
return err
}
for _, id := range ids {
err = w.fillTraceByID(ctx, id)
if err != nil {
log.Error("failed to fill traces by id", "id", id, "err", err)
return err
}
}

// Fill pending block traces into cache.
for {
ids, err = w.orm.GetPendingBatches(uint64(parallel))
if err != nil {
log.Error("failed to get pending batch ids", "err", err)
return err
}
if len(ids) == 0 {
return nil
}
for _, id := range ids {
err = w.fillTraceByID(ctx, id)
if err != nil {
log.Error("failed to fill traces by id", "id", id, "err", err)
return err
}
}
}
}

// fillTraceByID Fill block traces by batch id.
func (w *WatcherClient) fillTraceByID(ctx context.Context, id string) error {
batches, err := w.orm.GetBlockBatches(map[string]interface{}{"id": id})
if err != nil || len(batches) == 0 {
return err
}
batch := batches[0]
err = w.fillTraceByNumber(ctx, batch.StartBlockNumber, batch.EndBlockNumber)
if err != nil {
return err
}
return nil
}

func (w *WatcherClient) fillTraceByNumber(ctx context.Context, start, end uint64) error {
var (
rdb = w.orm.(cache.Cache)
client = w.Client
)
for height := start; height <= end; height++ {
number := big.NewInt(0).SetUint64(height)
exist, err := rdb.ExistTrace(ctx, number)
if err != nil {
return err
}
if exist {
continue
}
trace, err := client.GetBlockTraceByNumber(ctx, number)
if err != nil {
return err
}
err = rdb.SetBlockTrace(ctx, trace)
if err != nil {
return err
}
}
return nil
}
13 changes: 8 additions & 5 deletions bridge/l2/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func testCreateNewWatcherAndStop(t *testing.T) {
defer l2db.Close()

l2cfg := cfg.L2Config
rc := NewL2WatcherClient(context.Background(), l2Cli, l2cfg.Confirmations, l2cfg.BatchProposerConfig, l2cfg.L2MessengerAddress, l2db)
rc, err := NewL2WatcherClient(context.Background(), l2Cli, l2cfg.Confirmations, l2cfg.BatchProposerConfig, l2cfg.L2MessengerAddress, l2db)
assert.NoError(t, err)
rc.Start()
defer rc.Stop()

Expand Down Expand Up @@ -72,7 +73,8 @@ func testMonitorBridgeContract(t *testing.T) {
address, err := bind.WaitDeployed(context.Background(), l2Cli, tx)
assert.NoError(t, err)

rc := prepareRelayerClient(l2Cli, cfg.L2Config.BatchProposerConfig, db, address)
rc, err := prepareRelayerClient(l2Cli, cfg.L2Config.BatchProposerConfig, db, address)
assert.NoError(t, err)
rc.Start()
defer rc.Stop()

Expand Down Expand Up @@ -110,7 +112,7 @@ func testMonitorBridgeContract(t *testing.T) {
// check if we successfully stored events
height, err := db.GetLayer2LatestWatchedHeight()
assert.NoError(t, err)
t.Log("Height in DB is", height)
t.Log("Height in Persistence is", height)
assert.Greater(t, height, int64(previousHeight))
msgs, err := db.GetL2Messages(map[string]interface{}{"status": orm.MsgPending})
assert.NoError(t, err)
Expand All @@ -134,7 +136,8 @@ func testFetchMultipleSentMessageInOneBlock(t *testing.T) {
address, err := bind.WaitDeployed(context.Background(), l2Cli, trx)
assert.NoError(t, err)

rc := prepareRelayerClient(l2Cli, cfg.L2Config.BatchProposerConfig, db, address)
rc, err := prepareRelayerClient(l2Cli, cfg.L2Config.BatchProposerConfig, db, address)
assert.NoError(t, err)
rc.Start()
defer rc.Stop()

Expand Down Expand Up @@ -189,7 +192,7 @@ func testFetchMultipleSentMessageInOneBlock(t *testing.T) {
assert.Equal(t, 5, len(msgs))
}

func prepareRelayerClient(l2Cli *ethclient.Client, bpCfg *config.BatchProposerConfig, db database.OrmFactory, contractAddr common.Address) *WatcherClient {
func prepareRelayerClient(l2Cli *ethclient.Client, bpCfg *config.BatchProposerConfig, db database.OrmFactory, contractAddr common.Address) (*WatcherClient, error) {
return NewL2WatcherClient(context.Background(), l2Cli, 0, bpCfg, contractAddr, db)
}

Expand Down
Loading