Skip to content

perf(relayer): add sequencer submission strategy with blob‐fee history and target price #1659

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rollup/conf/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
"batch_submission": {
"min_batches": 1,
"max_batches": 6,
"timeout": 300
"timeout": 7200,
"backlog_max": 75
},
"gas_oracle_config": {
"min_gas_price": 0,
Expand Down
2 changes: 2 additions & 0 deletions rollup/internal/config/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type BatchSubmission struct {
MaxBatches int `json:"max_batches"`
// The time in seconds after which a batch is considered stale and should be submitted ignoring the min batch count.
TimeoutSec int64 `json:"timeout"`
// The maximum number of pending batches to keep in the backlog.
BacklogMax int64 `json:"backlog_max"`
}

// ChainMonitor this config is used to get batch status from chain_monitor API.
Expand Down
197 changes: 191 additions & 6 deletions rollup/internal/controller/relayer/l2_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math"
"math/big"
"sort"
"strings"
Expand Down Expand Up @@ -33,6 +34,32 @@ import (
rutils "scroll-tech/rollup/internal/utils"
)

// RelaxType enumerates the relaxation functions we support when
// turning a baseline fee into a “target” fee.
type RelaxType int

const (
// NoRelaxation means “don’t touch the baseline” (i.e. fallback/default).
NoRelaxation RelaxType = iota
Exponential
Sigmoid
)

const secondsPerBlock = 12

// BaselineType enumerates the baseline types we support when
// turning a baseline fee into a “target” fee.
type BaselineType int

const (
// PctMin means “take the minimum of the last N blocks’ fees, then
// take the PCT of that”.
PctMin BaselineType = iota
// EWMA means “take the exponentially‐weighted moving average of
// the last N blocks’ fees”.
EWMA
)

// Layer2Relayer is responsible for:
// i. committing and finalizing L2 blocks on L1.
// ii. updating L2 gas price oracle contract on L1.
Expand All @@ -46,6 +73,7 @@ type Layer2Relayer struct {
batchOrm *orm.Batch
chunkOrm *orm.Chunk
l2BlockOrm *orm.L2Block
l1BlockOrm *orm.L1Block

cfg *config.RelayerConfig

Expand All @@ -61,6 +89,26 @@ type Layer2Relayer struct {
metrics *l2RelayerMetrics

chainCfg *params.ChainConfig

lastFetchedBlock uint64 // highest block number ever pulled
feeHistory []*big.Int // sliding window of blob fees
batchStrategy StrategyParams
}

// StrategyParams holds the per‐window fee‐submission rules.
type StrategyParams struct {
BaselineType BaselineType // "pct_min" or "ewma"
BaselineParam float64 // percentile (0–1) or α for EWMA
Gamma float64 // relaxation γ
Beta float64 // relaxation β
RelaxType RelaxType // Exponential or Sigmoid
}

// bestParams maps your 2h/5h/12h windows to their best rules.
var bestParams = map[uint64]StrategyParams{
2 * 3600: {BaselineType: PctMin, BaselineParam: 0.10, Gamma: 0.4, Beta: 8, RelaxType: Exponential},
5 * 3600: {BaselineType: PctMin, BaselineParam: 0.30, Gamma: 0.6, Beta: 20, RelaxType: Sigmoid},
12 * 3600: {BaselineType: PctMin, BaselineParam: 0.50, Gamma: 0.5, Beta: 20, RelaxType: Sigmoid},
}

// NewLayer2Relayer will return a new instance of Layer2RelayerClient
Expand Down Expand Up @@ -106,6 +154,7 @@ func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db *gorm.

bundleOrm: orm.NewBundle(db),
batchOrm: orm.NewBatch(db),
l1BlockOrm: orm.NewL1Block(db),
l2BlockOrm: orm.NewL2Block(db),
chunkOrm: orm.NewChunk(db),

Expand Down Expand Up @@ -141,6 +190,9 @@ func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db *gorm.
return nil, fmt.Errorf("invalid service type for l2_relayer: %v", serviceType)
}

if _, err := layer2Relayer.fetchBlobFeeHistory(uint64(layer2Relayer.cfg.BatchSubmission.TimeoutSec)); err != nil {
return nil, fmt.Errorf("initial blob‐fee load failed: %w", err)
}
return layer2Relayer, nil
}

Expand Down Expand Up @@ -274,8 +326,33 @@ func (r *Layer2Relayer) ProcessPendingBatches() {
return
}

var batchesToSubmit []*dbBatchWithChunksAndParent
// if backlog outgrow max size, force‐submit enough oldest batches
backlogCount, err := r.batchOrm.GetFailedAndPendingBatchesCount(r.ctx)
if err != nil {
log.Error("Failed to fetch pending L2 batches", "err", err)
return
}

var forceSubmit bool

// return if not hitting target price
if backlogCount <= r.cfg.BatchSubmission.BacklogMax {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this function is much easier to read and reason about. One thing that we might consider but is probably okay for now: we always do the calculation, even if we don't have at least MinBatches. Only to then afterwards exit bc we don't have enough batches.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I changed the code to do this, but something that I did while doing that is only setting forceSubmit to true if the oldest batch by index must be force submitted, unlike now that this is check for all the batches in dbBatches. I argue this is ok as the smallest index must be the one that must be submitted next. What do you think?

// if the batch with the oldest index is too old, we force submit all batches that we have so far in the next step
oldest := dbBatches[0].CreatedAt

if r.cfg.BatchSubmission.TimeoutSec > 0 && !forceSubmit && time.Since(oldest) > time.Duration(r.cfg.BatchSubmission.TimeoutSec)*time.Second {
forceSubmit = true
}
if !forceSubmit {
if skip, err := r.skipSubmitByFee(oldest); skip {
log.Debug("Skipping batch submission", "reason", err)
return
}
}
// if !skip, fall through and submit
}

var batchesToSubmit []*dbBatchWithChunksAndParent
for i, dbBatch := range dbBatches {
if i == 0 && encoding.CodecVersion(dbBatch.CodecVersion) < encoding.CodecV7 {
// if the first batch is not >= V7 then we need to submit batches one by one
Expand Down Expand Up @@ -336,11 +413,6 @@ func (r *Layer2Relayer) ProcessPendingBatches() {
break
}

// if one of the batches is too old, we force submit all batches that we have so far in the next step
if r.cfg.BatchSubmission.TimeoutSec > 0 && !forceSubmit && time.Since(dbBatch.CreatedAt) > time.Duration(r.cfg.BatchSubmission.TimeoutSec)*time.Second {
forceSubmit = true
}

if batchesToSubmitLen < r.cfg.BatchSubmission.MaxBatches {
batchesToSubmit = append(batchesToSubmit, &dbBatchWithChunksAndParent{
Batch: dbBatch,
Expand Down Expand Up @@ -1120,6 +1192,119 @@ func (r *Layer2Relayer) StopSenders() {
}
}

// fetchBlobFeeHistory returns the last WindowSec seconds of blob‐fee samples,
// by reading L1Block table’s BlobBaseFee column.
func (r *Layer2Relayer) fetchBlobFeeHistory(windowSec uint64) ([]*big.Int, error) {
latest, err := r.l1BlockOrm.GetLatestL1BlockHeight(r.ctx)
if err != nil {
return nil, fmt.Errorf("GetLatestL1BlockHeight: %w", err)
}
// bootstrap on first call
if r.lastFetchedBlock == 0 {
// start window
r.lastFetchedBlock = latest - windowSec/secondsPerBlock
}
from := r.lastFetchedBlock + 1
//if new blocks
if from <= latest {
raw, err := r.l1BlockOrm.GetBlobFeesInRange(r.ctx, from, latest)
if err != nil {
return nil, fmt.Errorf("GetBlobFeesInRange: %w", err)
}
// append them
for _, v := range raw {
r.feeHistory = append(r.feeHistory, new(big.Int).SetUint64(v))
r.lastFetchedBlock++
}
}

maxLen := int(windowSec / secondsPerBlock)
if len(r.feeHistory) > maxLen {
r.feeHistory = r.feeHistory[len(r.feeHistory)-maxLen:]
}

return r.feeHistory, nil
}

// calculateTargetPrice applies pct_min/ewma + relaxation to get a BigInt target
func calculateTargetPrice(windowSec uint64, strategy StrategyParams, firstTime time.Time, history []*big.Int) *big.Int {
n := len(history)
if n == 0 {
return big.NewInt(0)
}
// convert to float64 Gwei
data := make([]float64, n)
for i, v := range history {
f, _ := new(big.Float).Quo(new(big.Float).SetInt(v), big.NewFloat(1e9)).Float64()
data[i] = f
}
var baseline float64
switch strategy.BaselineType {
case PctMin:
sort.Float64s(data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably also relatively expensive after the repeated DB query. Like Peter said would be better to keep this in memory.

Maybe something like this construction https://github.com/scroll-tech/go-ethereum/blob/e62c6f08e2cb233ca0e14d01bc3f41f2edaea369/rollup/da_syncer/batch_queue.go#L20
with a sorted heap and a shrinkingmap in memory could work.

Every time you check you would only need to verify which min/max blocknumbers you expect to be in the map. if they are not there then you query them from DB and add them (this way also after startup should be handled automatically). if an entry is too old you delete it out of the heap and map.

idx := int(strategy.BaselineParam * float64(n-1))
if idx < 0 {
idx = 0
}
baseline = data[idx]
case EWMA:
alpha := strategy.BaselineParam
ewma := data[0]
for i := 1; i < n; i++ {
ewma = alpha*data[i] + (1-alpha)*ewma
}
baseline = ewma
default:
baseline = data[n-1]
}
// relaxation
age := time.Since(firstTime).Seconds()
frac := age / float64(windowSec)
var adjusted float64
switch strategy.RelaxType {
case Exponential:
adjusted = baseline * (1 + strategy.Gamma*math.Exp(strategy.Beta*(frac-1)))
case Sigmoid:
adjusted = baseline * (1 + strategy.Gamma/(1+math.Exp(-strategy.Beta*(frac-0.5))))
default:
adjusted = baseline
}
Comment on lines +1263 to +1271
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add bounds checking for relaxation formulas

The exponential and sigmoid formulas could lead to extremely large values without proper bounds checking.

Add protection against potential numerical issues:

 switch strategy.RelaxType {
 case Exponential:
-	adjusted = baseline * (1 + strategy.Gamma*math.Exp(strategy.Beta*(frac-1)))
+	exponent := strategy.Beta * (frac - 1)
+	// Prevent extreme values that could cause overflow
+	if exponent > 20 {
+		exponent = 20
+	}
+	adjusted = baseline * (1 + strategy.Gamma*math.Exp(exponent))
 case Sigmoid:
-	adjusted = baseline * (1 + strategy.Gamma/(1+math.Exp(-strategy.Beta*(frac-0.5))))
+	sigmoid := strategy.Gamma / (1 + math.Exp(-strategy.Beta*(frac-0.5)))
+	// Ensure the adjustment stays within reasonable bounds
+	if sigmoid > 10 {
+		sigmoid = 10
+	}
+	adjusted = baseline * (1 + sigmoid)
 default:
 	adjusted = baseline
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
var adjusted float64
switch strategy.RelaxType {
case Exponential:
adjusted = baseline * (1 + strategy.Gamma*math.Exp(strategy.Beta*(frac-1)))
case Sigmoid:
adjusted = baseline * (1 + strategy.Gamma/(1+math.Exp(-strategy.Beta*(frac-0.5))))
default:
adjusted = baseline
}
var adjusted float64
switch strategy.RelaxType {
case Exponential:
exponent := strategy.Beta * (frac - 1)
// Prevent extreme values that could cause overflow
if exponent > 20 {
exponent = 20
}
adjusted = baseline * (1 + strategy.Gamma*math.Exp(exponent))
case Sigmoid:
sigmoid := strategy.Gamma / (1 + math.Exp(-strategy.Beta*(frac-0.5)))
// Ensure the adjustment stays within reasonable bounds
if sigmoid > 10 {
sigmoid = 10
}
adjusted = baseline * (1 + sigmoid)
default:
adjusted = baseline
}
🤖 Prompt for AI Agents
In rollup/internal/controller/relayer/l2_relayer.go around lines 1274 to 1282,
the exponential and sigmoid relaxation formulas can produce extremely large
values that may cause numerical instability. Add bounds checking to limit the
maximum and minimum values of the computed adjusted variable after applying the
formulas. Implement safeguards such as clamping the result within a reasonable
range to prevent overflow or underflow issues.

// back to wei
f := new(big.Float).Mul(big.NewFloat(adjusted), big.NewFloat(1e9))
out, _ := f.Int(nil)
return out
}

// skipSubmitByFee returns (true, nil) when submission should be skipped right now
// because the blob‐fee is above target and the timeout window hasn’t yet elapsed.
// Otherwise returns (false, err)
func (r *Layer2Relayer) skipSubmitByFee(oldest time.Time) (bool, error) {
windowSec := uint64(r.cfg.BatchSubmission.TimeoutSec)

hist, err := r.fetchBlobFeeHistory(windowSec)
if err != nil || len(hist) == 0 {
return false, fmt.Errorf(
"blob-fee history unavailable or empty; fallback to immediate submission: %w (history_length=%d)",
err, len(hist),
)
}

// calculate target & get current (in wei)
target := calculateTargetPrice(windowSec, r.batchStrategy, oldest, hist)
current := hist[len(hist)-1]

// if current fee > target and still inside the timeout window, skip
if current.Cmp(target) > 0 && time.Since(oldest) < time.Duration(windowSec)*time.Second {
return true, fmt.Errorf(
"blob-fee above target & window not yet passed; current=%s target=%s age=%s",
current.String(), target.String(), time.Since(oldest),
)
}

// otherwise proceed with submission
return false, nil
}

func addrFromSignerConfig(config *config.SignerConfig) (common.Address, error) {
switch config.SignerType {
case sender.PrivateKeySignerType:
Expand Down
12 changes: 12 additions & 0 deletions rollup/internal/orm/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,18 @@ func (o *Batch) GetRollupStatusByHashList(ctx context.Context, hashes []string)
return statuses, nil
}

func (o *Batch) GetFailedAndPendingBatchesCount(ctx context.Context) (int64, error) {
db := o.db.WithContext(ctx)
db = db.Model(&Batch{})
db = db.Where("rollup_status = ? OR rollup_status = ?", types.RollupCommitFailed, types.RollupPending)

var count int64
if err := db.Count(&count).Error; err != nil {
return 0, fmt.Errorf("Batch.GetFailedAndPendingBatchesCount error: %w", err)
}
return count, nil
}

// GetFailedAndPendingBatches retrieves batches with failed or pending status up to the specified limit.
// The returned batches are sorted in ascending order by their index.
func (o *Batch) GetFailedAndPendingBatches(ctx context.Context, limit int) ([]*Batch, error) {
Expand Down
14 changes: 14 additions & 0 deletions rollup/internal/orm/l1_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,20 @@ func (o *L1Block) GetL1Blocks(ctx context.Context, fields map[string]interface{}
return l1Blocks, nil
}

// GetBlobFeesInRange returns all blob_base_fee values for blocks
// with number ∈ [startBlock..endBlock], ordered by block number ascending.
func (o *L1Block) GetBlobFeesInRange(ctx context.Context, startBlock, endBlock uint64) ([]uint64, error) {
var fees []uint64
db := o.db.WithContext(ctx).
Model(&L1Block{}).
Where("number >= ? AND number <= ?", startBlock, endBlock).
Order("number ASC")
if err := db.Pluck("blob_base_fee", &fees).Error; err != nil {
return nil, fmt.Errorf("L1Block.GetBlobFeesInRange error: %w", err)
}
return fees, nil
}

// InsertL1Blocks batch inserts l1 blocks.
// If there's a block number conflict (e.g., due to reorg), soft deletes the existing block and inserts the new one.
func (o *L1Block) InsertL1Blocks(ctx context.Context, blocks []L1Block) error {
Expand Down
Loading