Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 4 additions & 61 deletions core/service/blockMainService.go
Original file line number Diff line number Diff line change
Expand Up @@ -1279,10 +1279,8 @@ func (bs *BlockService) GetBlockExtendedInfo(block *model.Block, includeReceipts

func (bs *BlockService) PopOffToBlock(commonBlock *model.Block) ([]*model.Block, error) {
var (
mempoolsBackupBytes *bytes.Buffer
mempoolsBackup []*model.MempoolTransaction
publishedReceipts []*model.PublishedReceipt
err error
publishedReceipts []*model.PublishedReceipt
err error
)
// if current blockchain Height is lower than minimal height of the blockchain that is allowed to rollback
lastBlock, err := bs.GetLastBlock()
Expand Down Expand Up @@ -1328,72 +1326,17 @@ func (bs *BlockService) PopOffToBlock(commonBlock *model.Block) ([]*model.Block,
}

// Backup existing transactions from mempool before rollback
mempoolsBackup, err = bs.MempoolService.GetMempoolTransactionsWantToBackup(commonBlock.Height)
err = bs.MempoolService.BackupMempools(commonBlock)
if err != nil {
return nil, err
}
bs.Logger.Warnf("mempool tx backup %d in total with block_height %d", len(mempoolsBackup), commonBlock.GetHeight())
derivedQueries := query.GetDerivedQuery(bs.Chaintype)
err = bs.QueryExecutor.BeginTx()
if err != nil {
return nil, err
}

mempoolsBackupBytes = bytes.NewBuffer([]byte{})

for _, mempool := range mempoolsBackup {
var (
tx *model.Transaction
txType transaction.TypeAction
)
tx, err := bs.TransactionUtil.ParseTransactionBytes(mempool.GetTransactionBytes(), true)
if err != nil {
return nil, err
}
txType, err = bs.ActionTypeSwitcher.GetTransactionType(tx)
if err != nil {
return nil, err
}

err = bs.TransactionCoreService.UndoApplyUnconfirmedTransaction(txType)
if err != nil {
return nil, err
}

/*
mempoolsBackupBytes format is
[...{4}byteSize,{bytesSize}transactionBytes]
*/
sizeMempool := uint32(len(mempool.GetTransactionBytes()))
mempoolsBackupBytes.Write(commonUtils.ConvertUint32ToBytes(sizeMempool))
mempoolsBackupBytes.Write(mempool.GetTransactionBytes())
}

for _, dQuery := range derivedQueries {
queries := dQuery.Rollback(commonBlock.Height)
err = bs.QueryExecutor.ExecuteTransactions(queries)
if err != nil {
_ = bs.QueryExecutor.RollbackTx()
return nil, err
}
}
err = bs.QueryExecutor.CommitTx()
if err != nil {
return nil, err
}
//
// TODO: here we should also delete all snapshot files relative to the block manifests being rolled back during derived tables
// rollback. Something like this:
// - before rolling back derived queries, select all spine block manifest records from commonBlock.Height till last
// - delete all snapshots referenced by them
//
if mempoolsBackupBytes.Len() > 0 {
kvdbMempoolsBackupKey := commonUtils.GetKvDbMempoolDBKey(bs.GetChainType())
err = bs.KVExecutor.Insert(kvdbMempoolsBackupKey, mempoolsBackupBytes.Bytes(), int(constant.KVDBMempoolsBackupExpiry))
if err != nil {
return nil, err
}
}

// remove peer memoization
bs.NodeRegistrationService.ResetScrambledNodes()
// clear block pool
Expand Down
123 changes: 72 additions & 51 deletions core/service/blockMainService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4109,12 +4109,20 @@ func (*mockMempoolServiceBlockPopSuccess) GetMempoolTransactionsWantToBackup(
return make([]*model.MempoolTransaction, 0), nil
}

func (*mockMempoolServiceBlockPopSuccess) BackupMempools(commonBlock *model.Block) error {
return nil
}

func (*mockMempoolServiceBlockPopFail) GetMempoolTransactionsWantToBackup(
height uint32,
) ([]*model.MempoolTransaction, error) {
return nil, errors.New("mockedError")
}

func (*mockMempoolServiceBlockPopFail) BackupMempools(commonBlock *model.Block) error {
return errors.New("error BackupMempools")
}

func (*mockReceiptSuccess) GetPublishedReceiptsByHeight(blockHeight uint32) ([]*model.PublishedReceipt, error) {
return make([]*model.PublishedReceipt, 0), nil
}
Expand Down Expand Up @@ -4315,27 +4323,37 @@ func (*mockedExecutorPopOffToBlockSuccessPopping) ExecuteSelectRow(qStr string,

func TestBlockService_PopOffToBlock(t *testing.T) {
type fields struct {
Chaintype chaintype.ChainType
KVExecutor kvdb.KVExecutorInterface
QueryExecutor query.ExecutorInterface
BlockQuery query.BlockQueryInterface
MempoolQuery query.MempoolQueryInterface
TransactionQuery query.TransactionQueryInterface
MerkleTreeQuery query.MerkleTreeQueryInterface
PublishedReceiptQuery query.PublishedReceiptQueryInterface
SkippedBlocksmithQuery query.SkippedBlocksmithQueryInterface
Signature crypto.SignatureInterface
MempoolService MempoolServiceInterface
ReceiptService ReceiptServiceInterface
NodeRegistrationService NodeRegistrationServiceInterface
ActionTypeSwitcher transaction.TypeActionSwitcher
AccountBalanceQuery query.AccountBalanceQueryInterface
ParticipationScoreQuery query.ParticipationScoreQueryInterface
NodeRegistrationQuery query.NodeRegistrationQueryInterface
BlockPoolService BlockPoolServiceInterface
TransactionCoreService TransactionCoreServiceInterface
Observer *observer.Observer
Logger *log.Logger
RWMutex sync.RWMutex
Chaintype chaintype.ChainType
KVExecutor kvdb.KVExecutorInterface
QueryExecutor query.ExecutorInterface
BlockQuery query.BlockQueryInterface
MempoolQuery query.MempoolQueryInterface
TransactionQuery query.TransactionQueryInterface
PublishedReceiptQuery query.PublishedReceiptQueryInterface
SkippedBlocksmithQuery query.SkippedBlocksmithQueryInterface
Signature crypto.SignatureInterface
MempoolService MempoolServiceInterface
ReceiptService ReceiptServiceInterface
NodeRegistrationService NodeRegistrationServiceInterface
BlocksmithService BlocksmithServiceInterface
ActionTypeSwitcher transaction.TypeActionSwitcher
AccountBalanceQuery query.AccountBalanceQueryInterface
ParticipationScoreQuery query.ParticipationScoreQueryInterface
NodeRegistrationQuery query.NodeRegistrationQueryInterface
AccountLedgerQuery query.AccountLedgerQueryInterface
BlocksmithStrategy strategy.BlocksmithStrategyInterface
BlockIncompleteQueueService BlockIncompleteQueueServiceInterface
BlockPoolService BlockPoolServiceInterface
Observer *observer.Observer
Logger *log.Logger
TransactionUtil transaction.UtilInterface
ReceiptUtil coreUtil.ReceiptUtilInterface
PublishedReceiptUtil coreUtil.PublishedReceiptUtilInterface
TransactionCoreService TransactionCoreServiceInterface
CoinbaseService CoinbaseServiceInterface
ParticipationScoreService ParticipationScoreServiceInterface
PublishedReceiptService PublishedReceiptServiceInterface
}
type args struct {
commonBlock *model.Block
Expand All @@ -4356,7 +4374,6 @@ func TestBlockService_PopOffToBlock(t *testing.T) {
BlockQuery: query.NewBlockQuery(&chaintype.MainChain{}),
MempoolQuery: nil,
TransactionQuery: query.NewTransactionQuery(&chaintype.MainChain{}),
MerkleTreeQuery: nil,
PublishedReceiptQuery: nil,
SkippedBlocksmithQuery: nil,
Signature: nil,
Expand Down Expand Up @@ -4386,7 +4403,6 @@ func TestBlockService_PopOffToBlock(t *testing.T) {
BlockQuery: query.NewBlockQuery(&chaintype.MainChain{}),
MempoolQuery: nil,
TransactionQuery: query.NewTransactionQuery(&chaintype.MainChain{}),
MerkleTreeQuery: nil,
PublishedReceiptQuery: nil,
SkippedBlocksmithQuery: nil,
Signature: nil,
Expand Down Expand Up @@ -4417,7 +4433,6 @@ func TestBlockService_PopOffToBlock(t *testing.T) {
BlockQuery: query.NewBlockQuery(&chaintype.MainChain{}),
MempoolQuery: nil,
TransactionQuery: query.NewTransactionQuery(&chaintype.MainChain{}),
MerkleTreeQuery: nil,
PublishedReceiptQuery: nil,
SkippedBlocksmithQuery: nil,
Signature: nil,
Expand Down Expand Up @@ -4448,7 +4463,6 @@ func TestBlockService_PopOffToBlock(t *testing.T) {
BlockQuery: query.NewBlockQuery(&chaintype.MainChain{}),
MempoolQuery: nil,
TransactionQuery: query.NewTransactionQuery(&chaintype.MainChain{}),
MerkleTreeQuery: nil,
PublishedReceiptQuery: nil,
SkippedBlocksmithQuery: nil,
Signature: nil,
Expand Down Expand Up @@ -4479,7 +4493,6 @@ func TestBlockService_PopOffToBlock(t *testing.T) {
BlockQuery: query.NewBlockQuery(&chaintype.MainChain{}),
MempoolQuery: nil,
TransactionQuery: query.NewTransactionQuery(&chaintype.MainChain{}),
MerkleTreeQuery: nil,
PublishedReceiptQuery: nil,
SkippedBlocksmithQuery: nil,
Signature: nil,
Expand Down Expand Up @@ -4510,7 +4523,6 @@ func TestBlockService_PopOffToBlock(t *testing.T) {
BlockQuery: query.NewBlockQuery(&chaintype.MainChain{}),
MempoolQuery: nil,
TransactionQuery: query.NewTransactionQuery(&chaintype.MainChain{}),
MerkleTreeQuery: nil,
PublishedReceiptQuery: nil,
SkippedBlocksmithQuery: nil,
Signature: nil,
Expand Down Expand Up @@ -4541,7 +4553,6 @@ func TestBlockService_PopOffToBlock(t *testing.T) {
BlockQuery: query.NewBlockQuery(&chaintype.MainChain{}),
MempoolQuery: nil,
TransactionQuery: query.NewTransactionQuery(&chaintype.MainChain{}),
MerkleTreeQuery: nil,
PublishedReceiptQuery: nil,
SkippedBlocksmithQuery: nil,
Signature: nil,
Expand Down Expand Up @@ -4584,38 +4595,48 @@ func TestBlockService_PopOffToBlock(t *testing.T) {
wantErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
bs := &BlockService{
Chaintype: tt.fields.Chaintype,
KVExecutor: tt.fields.KVExecutor,
QueryExecutor: tt.fields.QueryExecutor,
BlockQuery: tt.fields.BlockQuery,
MempoolQuery: tt.fields.MempoolQuery,
TransactionQuery: tt.fields.TransactionQuery,
PublishedReceiptQuery: tt.fields.PublishedReceiptQuery,
SkippedBlocksmithQuery: tt.fields.SkippedBlocksmithQuery,
Signature: tt.fields.Signature,
MempoolService: tt.fields.MempoolService,
ReceiptService: tt.fields.ReceiptService,
NodeRegistrationService: tt.fields.NodeRegistrationService,
ActionTypeSwitcher: tt.fields.ActionTypeSwitcher,
AccountBalanceQuery: tt.fields.AccountBalanceQuery,
ParticipationScoreQuery: tt.fields.ParticipationScoreQuery,
NodeRegistrationQuery: tt.fields.NodeRegistrationQuery,
BlockPoolService: tt.fields.BlockPoolService,
TransactionCoreService: tt.fields.TransactionCoreService,
Observer: tt.fields.Observer,
Logger: tt.fields.Logger,
RWMutex: tt.fields.RWMutex,
Chaintype: tt.fields.Chaintype,
KVExecutor: tt.fields.KVExecutor,
QueryExecutor: tt.fields.QueryExecutor,
BlockQuery: tt.fields.BlockQuery,
MempoolQuery: tt.fields.MempoolQuery,
TransactionQuery: tt.fields.TransactionQuery,
PublishedReceiptQuery: tt.fields.PublishedReceiptQuery,
SkippedBlocksmithQuery: tt.fields.SkippedBlocksmithQuery,
Signature: tt.fields.Signature,
MempoolService: tt.fields.MempoolService,
ReceiptService: tt.fields.ReceiptService,
NodeRegistrationService: tt.fields.NodeRegistrationService,
BlocksmithService: tt.fields.BlocksmithService,
ActionTypeSwitcher: tt.fields.ActionTypeSwitcher,
AccountBalanceQuery: tt.fields.AccountBalanceQuery,
ParticipationScoreQuery: tt.fields.ParticipationScoreQuery,
NodeRegistrationQuery: tt.fields.NodeRegistrationQuery,
AccountLedgerQuery: tt.fields.AccountLedgerQuery,
BlocksmithStrategy: tt.fields.BlocksmithStrategy,
BlockIncompleteQueueService: tt.fields.BlockIncompleteQueueService,
BlockPoolService: tt.fields.BlockPoolService,
Observer: tt.fields.Observer,
Logger: tt.fields.Logger,
TransactionUtil: tt.fields.TransactionUtil,
ReceiptUtil: tt.fields.ReceiptUtil,
PublishedReceiptUtil: tt.fields.PublishedReceiptUtil,
TransactionCoreService: tt.fields.TransactionCoreService,
CoinbaseService: tt.fields.CoinbaseService,
ParticipationScoreService: tt.fields.ParticipationScoreService,
PublishedReceiptService: tt.fields.PublishedReceiptService,
}
got, err := bs.PopOffToBlock(tt.args.commonBlock)
if (err != nil) != tt.wantErr {
t.Errorf("PopOffToBlock() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("BlockService.PopOffToBlock() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("PopOffToBlock() got = \n%v, want \n%v", got, tt.want)
t.Errorf("BlockService.PopOffToBlock() = %v, want %v", got, tt.want)
}
})
}
Expand Down
Loading