diff --git a/core/service/blockMainService.go b/core/service/blockMainService.go index fb5154757..87b6aa764 100644 --- a/core/service/blockMainService.go +++ b/core/service/blockMainService.go @@ -1279,10 +1279,8 @@ func (bs *BlockService) GetBlockExtendedInfo(block *model.Block, includeReceipts func (bs *BlockService) PopOffToBlock(commonBlock *model.Block) ([]*model.Block, error) { var ( - mempoolsBackupBytes *bytes.Buffer - mempoolsBackup []*model.MempoolTransaction - publishedReceipts []*model.PublishedReceipt - err error + publishedReceipts []*model.PublishedReceipt + err error ) // if current blockchain Height is lower than minimal height of the blockchain that is allowed to rollback lastBlock, err := bs.GetLastBlock() @@ -1328,72 +1326,17 @@ func (bs *BlockService) PopOffToBlock(commonBlock *model.Block) ([]*model.Block, } // Backup existing transactions from mempool before rollback - mempoolsBackup, err = bs.MempoolService.GetMempoolTransactionsWantToBackup(commonBlock.Height) + err = bs.MempoolService.BackupMempools(commonBlock) if err != nil { return nil, err } - bs.Logger.Warnf("mempool tx backup %d in total with block_height %d", len(mempoolsBackup), commonBlock.GetHeight()) - derivedQueries := query.GetDerivedQuery(bs.Chaintype) - err = bs.QueryExecutor.BeginTx() - if err != nil { - return nil, err - } - - mempoolsBackupBytes = bytes.NewBuffer([]byte{}) - - for _, mempool := range mempoolsBackup { - var ( - tx *model.Transaction - txType transaction.TypeAction - ) - tx, err := bs.TransactionUtil.ParseTransactionBytes(mempool.GetTransactionBytes(), true) - if err != nil { - return nil, err - } - txType, err = bs.ActionTypeSwitcher.GetTransactionType(tx) - if err != nil { - return nil, err - } - err = bs.TransactionCoreService.UndoApplyUnconfirmedTransaction(txType) - if err != nil { - return nil, err - } - - /* - mempoolsBackupBytes format is - [...{4}byteSize,{bytesSize}transactionBytes] - */ - sizeMempool := uint32(len(mempool.GetTransactionBytes())) - mempoolsBackupBytes.Write(commonUtils.ConvertUint32ToBytes(sizeMempool)) - mempoolsBackupBytes.Write(mempool.GetTransactionBytes()) - } - - for _, dQuery := range derivedQueries { - queries := dQuery.Rollback(commonBlock.Height) - err = bs.QueryExecutor.ExecuteTransactions(queries) - if err != nil { - _ = bs.QueryExecutor.RollbackTx() - return nil, err - } - } - err = bs.QueryExecutor.CommitTx() - if err != nil { - return nil, err - } - // // TODO: here we should also delete all snapshot files relative to the block manifests being rolled back during derived tables // rollback. Something like this: // - before rolling back derived queries, select all spine block manifest records from commonBlock.Height till last // - delete all snapshots referenced by them // - if mempoolsBackupBytes.Len() > 0 { - kvdbMempoolsBackupKey := commonUtils.GetKvDbMempoolDBKey(bs.GetChainType()) - err = bs.KVExecutor.Insert(kvdbMempoolsBackupKey, mempoolsBackupBytes.Bytes(), int(constant.KVDBMempoolsBackupExpiry)) - if err != nil { - return nil, err - } - } + // remove peer memoization bs.NodeRegistrationService.ResetScrambledNodes() // clear block pool diff --git a/core/service/blockMainService_test.go b/core/service/blockMainService_test.go index 6dd70bddf..df99437c3 100644 --- a/core/service/blockMainService_test.go +++ b/core/service/blockMainService_test.go @@ -4109,12 +4109,20 @@ func (*mockMempoolServiceBlockPopSuccess) GetMempoolTransactionsWantToBackup( return make([]*model.MempoolTransaction, 0), nil } +func (*mockMempoolServiceBlockPopSuccess) BackupMempools(commonBlock *model.Block) error { + return nil +} + func (*mockMempoolServiceBlockPopFail) GetMempoolTransactionsWantToBackup( height uint32, ) ([]*model.MempoolTransaction, error) { return nil, errors.New("mockedError") } +func (*mockMempoolServiceBlockPopFail) BackupMempools(commonBlock *model.Block) error { + return errors.New("error BackupMempools") +} + func (*mockReceiptSuccess) GetPublishedReceiptsByHeight(blockHeight uint32) ([]*model.PublishedReceipt, error) { return make([]*model.PublishedReceipt, 0), nil } @@ -4315,27 +4323,37 @@ func (*mockedExecutorPopOffToBlockSuccessPopping) ExecuteSelectRow(qStr string, func TestBlockService_PopOffToBlock(t *testing.T) { type fields struct { - Chaintype chaintype.ChainType - KVExecutor kvdb.KVExecutorInterface - QueryExecutor query.ExecutorInterface - BlockQuery query.BlockQueryInterface - MempoolQuery query.MempoolQueryInterface - TransactionQuery query.TransactionQueryInterface - MerkleTreeQuery query.MerkleTreeQueryInterface - PublishedReceiptQuery query.PublishedReceiptQueryInterface - SkippedBlocksmithQuery query.SkippedBlocksmithQueryInterface - Signature crypto.SignatureInterface - MempoolService MempoolServiceInterface - ReceiptService ReceiptServiceInterface - NodeRegistrationService NodeRegistrationServiceInterface - ActionTypeSwitcher transaction.TypeActionSwitcher - AccountBalanceQuery query.AccountBalanceQueryInterface - ParticipationScoreQuery query.ParticipationScoreQueryInterface - NodeRegistrationQuery query.NodeRegistrationQueryInterface - BlockPoolService BlockPoolServiceInterface - TransactionCoreService TransactionCoreServiceInterface - Observer *observer.Observer - Logger *log.Logger + RWMutex sync.RWMutex + Chaintype chaintype.ChainType + KVExecutor kvdb.KVExecutorInterface + QueryExecutor query.ExecutorInterface + BlockQuery query.BlockQueryInterface + MempoolQuery query.MempoolQueryInterface + TransactionQuery query.TransactionQueryInterface + PublishedReceiptQuery query.PublishedReceiptQueryInterface + SkippedBlocksmithQuery query.SkippedBlocksmithQueryInterface + Signature crypto.SignatureInterface + MempoolService MempoolServiceInterface + ReceiptService ReceiptServiceInterface + NodeRegistrationService NodeRegistrationServiceInterface + BlocksmithService BlocksmithServiceInterface + ActionTypeSwitcher transaction.TypeActionSwitcher + AccountBalanceQuery query.AccountBalanceQueryInterface + ParticipationScoreQuery query.ParticipationScoreQueryInterface + NodeRegistrationQuery query.NodeRegistrationQueryInterface + AccountLedgerQuery query.AccountLedgerQueryInterface + BlocksmithStrategy strategy.BlocksmithStrategyInterface + BlockIncompleteQueueService BlockIncompleteQueueServiceInterface + BlockPoolService BlockPoolServiceInterface + Observer *observer.Observer + Logger *log.Logger + TransactionUtil transaction.UtilInterface + ReceiptUtil coreUtil.ReceiptUtilInterface + PublishedReceiptUtil coreUtil.PublishedReceiptUtilInterface + TransactionCoreService TransactionCoreServiceInterface + CoinbaseService CoinbaseServiceInterface + ParticipationScoreService ParticipationScoreServiceInterface + PublishedReceiptService PublishedReceiptServiceInterface } type args struct { commonBlock *model.Block @@ -4356,7 +4374,6 @@ func TestBlockService_PopOffToBlock(t *testing.T) { BlockQuery: query.NewBlockQuery(&chaintype.MainChain{}), MempoolQuery: nil, TransactionQuery: query.NewTransactionQuery(&chaintype.MainChain{}), - MerkleTreeQuery: nil, PublishedReceiptQuery: nil, SkippedBlocksmithQuery: nil, Signature: nil, @@ -4386,7 +4403,6 @@ func TestBlockService_PopOffToBlock(t *testing.T) { BlockQuery: query.NewBlockQuery(&chaintype.MainChain{}), MempoolQuery: nil, TransactionQuery: query.NewTransactionQuery(&chaintype.MainChain{}), - MerkleTreeQuery: nil, PublishedReceiptQuery: nil, SkippedBlocksmithQuery: nil, Signature: nil, @@ -4417,7 +4433,6 @@ func TestBlockService_PopOffToBlock(t *testing.T) { BlockQuery: query.NewBlockQuery(&chaintype.MainChain{}), MempoolQuery: nil, TransactionQuery: query.NewTransactionQuery(&chaintype.MainChain{}), - MerkleTreeQuery: nil, PublishedReceiptQuery: nil, SkippedBlocksmithQuery: nil, Signature: nil, @@ -4448,7 +4463,6 @@ func TestBlockService_PopOffToBlock(t *testing.T) { BlockQuery: query.NewBlockQuery(&chaintype.MainChain{}), MempoolQuery: nil, TransactionQuery: query.NewTransactionQuery(&chaintype.MainChain{}), - MerkleTreeQuery: nil, PublishedReceiptQuery: nil, SkippedBlocksmithQuery: nil, Signature: nil, @@ -4479,7 +4493,6 @@ func TestBlockService_PopOffToBlock(t *testing.T) { BlockQuery: query.NewBlockQuery(&chaintype.MainChain{}), MempoolQuery: nil, TransactionQuery: query.NewTransactionQuery(&chaintype.MainChain{}), - MerkleTreeQuery: nil, PublishedReceiptQuery: nil, SkippedBlocksmithQuery: nil, Signature: nil, @@ -4510,7 +4523,6 @@ func TestBlockService_PopOffToBlock(t *testing.T) { BlockQuery: query.NewBlockQuery(&chaintype.MainChain{}), MempoolQuery: nil, TransactionQuery: query.NewTransactionQuery(&chaintype.MainChain{}), - MerkleTreeQuery: nil, PublishedReceiptQuery: nil, SkippedBlocksmithQuery: nil, Signature: nil, @@ -4541,7 +4553,6 @@ func TestBlockService_PopOffToBlock(t *testing.T) { BlockQuery: query.NewBlockQuery(&chaintype.MainChain{}), MempoolQuery: nil, TransactionQuery: query.NewTransactionQuery(&chaintype.MainChain{}), - MerkleTreeQuery: nil, PublishedReceiptQuery: nil, SkippedBlocksmithQuery: nil, Signature: nil, @@ -4584,38 +4595,48 @@ func TestBlockService_PopOffToBlock(t *testing.T) { wantErr: false, }, } - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { bs := &BlockService{ - Chaintype: tt.fields.Chaintype, - KVExecutor: tt.fields.KVExecutor, - QueryExecutor: tt.fields.QueryExecutor, - BlockQuery: tt.fields.BlockQuery, - MempoolQuery: tt.fields.MempoolQuery, - TransactionQuery: tt.fields.TransactionQuery, - PublishedReceiptQuery: tt.fields.PublishedReceiptQuery, - SkippedBlocksmithQuery: tt.fields.SkippedBlocksmithQuery, - Signature: tt.fields.Signature, - MempoolService: tt.fields.MempoolService, - ReceiptService: tt.fields.ReceiptService, - NodeRegistrationService: tt.fields.NodeRegistrationService, - ActionTypeSwitcher: tt.fields.ActionTypeSwitcher, - AccountBalanceQuery: tt.fields.AccountBalanceQuery, - ParticipationScoreQuery: tt.fields.ParticipationScoreQuery, - NodeRegistrationQuery: tt.fields.NodeRegistrationQuery, - BlockPoolService: tt.fields.BlockPoolService, - TransactionCoreService: tt.fields.TransactionCoreService, - Observer: tt.fields.Observer, - Logger: tt.fields.Logger, + RWMutex: tt.fields.RWMutex, + Chaintype: tt.fields.Chaintype, + KVExecutor: tt.fields.KVExecutor, + QueryExecutor: tt.fields.QueryExecutor, + BlockQuery: tt.fields.BlockQuery, + MempoolQuery: tt.fields.MempoolQuery, + TransactionQuery: tt.fields.TransactionQuery, + PublishedReceiptQuery: tt.fields.PublishedReceiptQuery, + SkippedBlocksmithQuery: tt.fields.SkippedBlocksmithQuery, + Signature: tt.fields.Signature, + MempoolService: tt.fields.MempoolService, + ReceiptService: tt.fields.ReceiptService, + NodeRegistrationService: tt.fields.NodeRegistrationService, + BlocksmithService: tt.fields.BlocksmithService, + ActionTypeSwitcher: tt.fields.ActionTypeSwitcher, + AccountBalanceQuery: tt.fields.AccountBalanceQuery, + ParticipationScoreQuery: tt.fields.ParticipationScoreQuery, + NodeRegistrationQuery: tt.fields.NodeRegistrationQuery, + AccountLedgerQuery: tt.fields.AccountLedgerQuery, + BlocksmithStrategy: tt.fields.BlocksmithStrategy, + BlockIncompleteQueueService: tt.fields.BlockIncompleteQueueService, + BlockPoolService: tt.fields.BlockPoolService, + Observer: tt.fields.Observer, + Logger: tt.fields.Logger, + TransactionUtil: tt.fields.TransactionUtil, + ReceiptUtil: tt.fields.ReceiptUtil, + PublishedReceiptUtil: tt.fields.PublishedReceiptUtil, + TransactionCoreService: tt.fields.TransactionCoreService, + CoinbaseService: tt.fields.CoinbaseService, + ParticipationScoreService: tt.fields.ParticipationScoreService, + PublishedReceiptService: tt.fields.PublishedReceiptService, } got, err := bs.PopOffToBlock(tt.args.commonBlock) if (err != nil) != tt.wantErr { - t.Errorf("PopOffToBlock() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("BlockService.PopOffToBlock() error = %v, wantErr %v", err, tt.wantErr) return } if !reflect.DeepEqual(got, tt.want) { - t.Errorf("PopOffToBlock() got = \n%v, want \n%v", got, tt.want) + t.Errorf("BlockService.PopOffToBlock() = %v, want %v", got, tt.want) } }) } diff --git a/core/service/mempoolCoreService.go b/core/service/mempoolCoreService.go index 91256c989..a54375cba 100644 --- a/core/service/mempoolCoreService.go +++ b/core/service/mempoolCoreService.go @@ -1,6 +1,7 @@ package service import ( + "bytes" "database/sql" "sort" "time" @@ -16,6 +17,7 @@ import ( "github.com/zoobc/zoobc-core/common/query" "github.com/zoobc/zoobc-core/common/transaction" "github.com/zoobc/zoobc-core/common/util" + commonUtils "github.com/zoobc/zoobc-core/common/util" coreUtil "github.com/zoobc/zoobc-core/core/util" "github.com/zoobc/zoobc-core/observer" "golang.org/x/crypto/sha3" @@ -44,6 +46,7 @@ type ( ) ([]*model.BatchReceipt, error) DeleteExpiredMempoolTransactions() error GetMempoolTransactionsWantToBackup(height uint32) ([]*model.MempoolTransaction, error) + BackupMempools(commonBlock *model.Block) error } // MempoolService contains all transactions in mempool plus a mux to manage locks in concurrency @@ -66,6 +69,7 @@ type ( ReceiptUtil coreUtil.ReceiptUtilInterface ReceiptService ReceiptServiceInterface TransactionCoreService TransactionCoreServiceInterface + BlockService BlockService } ) @@ -461,3 +465,77 @@ func (mps *MempoolService) GetMempoolTransactionsWantToBackup(height uint32) ([] return mempools, nil } + +func (mps *MempoolService) BackupMempools(commonBlock *model.Block) error { + + var ( + mempoolsBackupBytes *bytes.Buffer + mempoolsBackup []*model.MempoolTransaction + err error + ) + + mempoolsBackup, err = mps.GetMempoolTransactionsWantToBackup(commonBlock.Height) + if err != nil { + return err + } + mps.Logger.Warnf("mempool tx backup %d in total with block_height %d", len(mempoolsBackup), commonBlock.GetHeight()) + derivedQueries := query.GetDerivedQuery(mps.Chaintype) + err = mps.QueryExecutor.BeginTx() + if err != nil { + return err + } + + mempoolsBackupBytes = bytes.NewBuffer([]byte{}) + + for _, mempool := range mempoolsBackup { + var ( + tx *model.Transaction + txType transaction.TypeAction + ) + tx, err := mps.TransactionUtil.ParseTransactionBytes(mempool.GetTransactionBytes(), true) + if err != nil { + return err + } + txType, err = mps.ActionTypeSwitcher.GetTransactionType(tx) + if err != nil { + return err + } + + err = mps.TransactionCoreService.UndoApplyUnconfirmedTransaction(txType) + if err != nil { + _ = mps.QueryExecutor.RollbackTx() + return err + } + + /* + mempoolsBackupBytes format is + [...{4}byteSize,{bytesSize}transactionBytes] + */ + sizeMempool := uint32(len(mempool.GetTransactionBytes())) + mempoolsBackupBytes.Write(commonUtils.ConvertUint32ToBytes(sizeMempool)) + mempoolsBackupBytes.Write(mempool.GetTransactionBytes()) + } + + for _, dQuery := range derivedQueries { + queries := dQuery.Rollback(commonBlock.Height) + err = mps.QueryExecutor.ExecuteTransactions(queries) + if err != nil { + _ = mps.QueryExecutor.RollbackTx() + return err + } + } + err = mps.QueryExecutor.CommitTx() + if err != nil { + return err + } + + if mempoolsBackupBytes.Len() > 0 { + kvdbMempoolsBackupKey := commonUtils.GetKvDbMempoolDBKey(mps.BlockService.GetChainType()) + err = mps.KVExecutor.Insert(kvdbMempoolsBackupKey, mempoolsBackupBytes.Bytes(), int(constant.KVDBMempoolsBackupExpiry)) + if err != nil { + return err + } + } + + return nil +}