Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions core/txpool/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,7 @@ func (h *priceHeap) Pop() interface{} {
// the floating heap is better. When baseFee is decreasing they behave similarly.
type pricedList struct {
// Number of stale price points to (re-heap trigger).
// This field is accessed atomically, and must be the first field
// to ensure it has correct alignment for atomic.AddInt64.
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
stales int64
stales atomic.Int64

all *lookup // Pointer to the map of all transactions
urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions
Expand Down Expand Up @@ -545,7 +542,7 @@ func (l *pricedList) Put(tx *types.Transaction, local bool) {
// the heap if a large enough ratio of transactions go stale.
func (l *pricedList) Removed(count int) {
// Bump the stale counter, but exit if still too low (< 25%)
stales := atomic.AddInt64(&l.stales, int64(count))
stales := l.stales.Add(int64(count))
if int(stales) <= (len(l.urgent.list)+len(l.floating.list))/4 {
return
}
Expand All @@ -570,7 +567,7 @@ func (l *pricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool {
for len(h.list) > 0 {
head := h.list[0]
if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated
atomic.AddInt64(&l.stales, -1)
l.stales.Add(-1)
heap.Pop(h)
continue
}
Expand All @@ -597,7 +594,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) {
// Discard stale transactions if found during cleanup
tx := heap.Pop(&l.urgent).(*types.Transaction)
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
atomic.AddInt64(&l.stales, -1)
l.stales.Add(-1)
continue
}
// Non stale transaction found, move to floating heap
Expand All @@ -610,7 +607,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) {
// Discard stale transactions if found during cleanup
tx := heap.Pop(&l.floating).(*types.Transaction)
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
atomic.AddInt64(&l.stales, -1)
l.stales.Add(-1)
continue
}
// Non stale transaction found, discard it
Expand All @@ -633,7 +630,7 @@ func (l *pricedList) Reheap() {
l.reheapMu.Lock()
defer l.reheapMu.Unlock()
start := time.Now()
atomic.StoreInt64(&l.stales, 0)
l.stales.Store(0)
l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount())
l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
l.urgent.list = append(l.urgent.list, tx)
Expand Down
3 changes: 1 addition & 2 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"math/big"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -383,7 +382,7 @@ func (pool *TxPool) loop() {
pool.mu.RLock()
pending, queued := pool.stats()
pool.mu.RUnlock()
stales := int(atomic.LoadInt64(&pool.priced.stales))
stales := int(pool.priced.stales.Load())

if pending != prevPending || queued != prevQueued || stales != prevStales {
log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
Expand Down
6 changes: 3 additions & 3 deletions core/txpool/txpool2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestTransactionFutureAttack(t *testing.T) {

// Create the pool to test the limit enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
config := testTxPoolConfig
config.GlobalQueue = 100
config.GlobalSlots = 100
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestTransactionFuture1559(t *testing.T) {
t.Parallel()
// Create the pool to test the pricing enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
pool := NewTxPool(testTxPoolConfig, eip1559Config, blockchain)
defer pool.Stop()

Expand Down Expand Up @@ -147,7 +147,7 @@ func TestTransactionZAttack(t *testing.T) {
t.Parallel()
// Create the pool to test the pricing enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
pool := NewTxPool(testTxPoolConfig, eip1559Config, blockchain)
defer pool.Stop()
// Create a number of test accounts, fund them and make transactions
Expand Down
52 changes: 29 additions & 23 deletions core/txpool/txpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,21 @@ func init() {
}

type testBlockChain struct {
gasLimit uint64 // must be first field for 64 bit alignment (atomic access)
gasLimit atomic.Uint64
statedb *state.StateDB
chainHeadFeed *event.Feed
}

func newTestBlockChain(gasLimit uint64, statedb *state.StateDB, chainHeadFeed *event.Feed) *testBlockChain {
bc := testBlockChain{statedb: statedb, chainHeadFeed: new(event.Feed)}
bc.gasLimit.Store(gasLimit)
return &bc
}

func (bc *testBlockChain) CurrentBlock() *types.Header {
return &types.Header{
Number: new(big.Int),
GasLimit: atomic.LoadUint64(&bc.gasLimit),
GasLimit: bc.gasLimit.Load(),
}
}

Expand Down Expand Up @@ -121,7 +127,7 @@ func setupPool() (*TxPool, *ecdsa.PrivateKey) {

func setupPoolWithConfig(config *params.ChainConfig) (*TxPool, *ecdsa.PrivateKey) {
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{10000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(10000000, statedb, new(event.Feed))

key, _ := crypto.GenerateKey()
pool := NewTxPool(testTxPoolConfig, config, blockchain)
Expand Down Expand Up @@ -236,7 +242,7 @@ func TestStateChangeDuringReset(t *testing.T) {

// setup pool with 2 transaction in it
statedb.SetBalance(address, new(big.Int).SetUint64(params.Ether))
blockchain := &testChain{&testBlockChain{1000000000, statedb, new(event.Feed)}, address, &trigger}
blockchain := &testChain{newTestBlockChain(1000000000, statedb, new(event.Feed)), address, &trigger}

tx0 := transaction(0, 100000, key)
tx1 := transaction(1, 100000, key)
Expand Down Expand Up @@ -430,7 +436,7 @@ func TestChainFork(t *testing.T) {
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
statedb.AddBalance(addr, big.NewInt(100000000000000))

pool.chain = &testBlockChain{1000000, statedb, new(event.Feed)}
pool.chain = newTestBlockChain(1000000, statedb, new(event.Feed))
<-pool.requestReset(nil, nil)
}
resetState()
Expand Down Expand Up @@ -459,7 +465,7 @@ func TestDoubleNonce(t *testing.T) {
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
statedb.AddBalance(addr, big.NewInt(100000000000000))

pool.chain = &testBlockChain{1000000, statedb, new(event.Feed)}
pool.chain = newTestBlockChain(1000000, statedb, new(event.Feed))
<-pool.requestReset(nil, nil)
}
resetState()
Expand Down Expand Up @@ -629,7 +635,7 @@ func TestDropping(t *testing.T) {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 4)
}
// Reduce the block gas limit, check that invalidated transactions are dropped
atomic.StoreUint64(&pool.chain.(*testBlockChain).gasLimit, 100)
pool.chain.(*testBlockChain).gasLimit.Store(100)
<-pool.requestReset(nil, nil)

if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
Expand Down Expand Up @@ -657,7 +663,7 @@ func TestPostponing(t *testing.T) {

// Create the pool to test the postponing with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop()
Expand Down Expand Up @@ -869,7 +875,7 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) {

// Create the pool to test the limit enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

config := testTxPoolConfig
config.NoLocals = nolocals
Expand Down Expand Up @@ -961,7 +967,7 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {

// Create the pool to test the non-expiration enforcement
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

config := testTxPoolConfig
config.Lifetime = time.Second
Expand Down Expand Up @@ -1146,7 +1152,7 @@ func TestPendingGlobalLimiting(t *testing.T) {

// Create the pool to test the limit enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

config := testTxPoolConfig
config.GlobalSlots = config.AccountSlots * 10
Expand Down Expand Up @@ -1248,7 +1254,7 @@ func TestCapClearsFromAll(t *testing.T) {

// Create the pool to test the limit enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

config := testTxPoolConfig
config.AccountSlots = 2
Expand Down Expand Up @@ -1282,7 +1288,7 @@ func TestPendingMinimumAllowance(t *testing.T) {

// Create the pool to test the limit enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

config := testTxPoolConfig
config.GlobalSlots = 1
Expand Down Expand Up @@ -1330,7 +1336,7 @@ func TestRepricing(t *testing.T) {

// Create the pool to test the pricing enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop()
Expand Down Expand Up @@ -1578,7 +1584,7 @@ func TestRepricingKeepsLocals(t *testing.T) {

// Create the pool to test the pricing enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

pool := NewTxPool(testTxPoolConfig, eip1559Config, blockchain)
defer pool.Stop()
Expand Down Expand Up @@ -1651,7 +1657,7 @@ func TestUnderpricing(t *testing.T) {

// Create the pool to test the pricing enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

config := testTxPoolConfig
config.GlobalSlots = 2
Expand Down Expand Up @@ -1765,7 +1771,7 @@ func TestStableUnderpricing(t *testing.T) {

// Create the pool to test the pricing enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

config := testTxPoolConfig
config.GlobalSlots = 128
Expand Down Expand Up @@ -1997,7 +2003,7 @@ func TestDeduplication(t *testing.T) {

// Create the pool to test the pricing enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop()
Expand Down Expand Up @@ -2063,7 +2069,7 @@ func TestReplacement(t *testing.T) {

// Create the pool to test the pricing enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop()
Expand Down Expand Up @@ -2268,7 +2274,7 @@ func testJournaling(t *testing.T, nolocals bool) {

// Create the original pool to inject transaction into the journal
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

config := testTxPoolConfig
config.NoLocals = nolocals
Expand Down Expand Up @@ -2310,7 +2316,7 @@ func testJournaling(t *testing.T, nolocals bool) {
// Terminate the old pool, bump the local nonce, create a new pool and ensure relevant transaction survive
pool.Stop()
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
blockchain = &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain = newTestBlockChain(1000000, statedb, new(event.Feed))

pool = NewTxPool(config, params.TestChainConfig, blockchain)

Expand All @@ -2337,7 +2343,7 @@ func testJournaling(t *testing.T, nolocals bool) {
pool.Stop()

statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
blockchain = &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain = newTestBlockChain(1000000, statedb, new(event.Feed))
pool = NewTxPool(config, params.TestChainConfig, blockchain)

pending, queued = pool.Stats()
Expand Down Expand Up @@ -2366,7 +2372,7 @@ func TestStatusCheck(t *testing.T) {

// Create the pool to test the status retrievals with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := &testBlockChain{1000000, statedb, new(event.Feed)}
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))

pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop()
Expand Down