diff --git a/core/blockchainsync/blockPopper.go b/core/blockchainsync/blockPopper.go index b7112ebb1..708262ca8 100644 --- a/core/blockchainsync/blockPopper.go +++ b/core/blockchainsync/blockPopper.go @@ -23,6 +23,7 @@ type BlockPopper struct { ChainType chaintype.ChainType ActionTypeSwitcher transaction.TypeActionSwitcher KVDB kvdb.KVExecutorInterface + Logger *log.Logger } // PopOffToBlock will remove the block in current Chain until commonBlock is reached @@ -41,8 +42,8 @@ func (bp *BlockPopper) PopOffToBlock(commonBlock *model.Block) ([]*model.Block, minRollbackHeight := getMinRollbackHeight(lastBlock.Height) if commonBlock.Height < minRollbackHeight { - // TODO: handle it appropriately and analyze the effect if this returning empty element in the further processfork pocess - log.Warn("the node blockchain detects hardfork, please manually delete the database to recover") + // TODO: handle it appropriately and analyze the effect if this returning empty element in the further processfork process + bp.Logger.Warn("the node blockchain detects hardfork, please manually delete the database to recover") return []*model.Block{}, nil } @@ -73,7 +74,7 @@ func (bp *BlockPopper) PopOffToBlock(commonBlock *model.Block) ([]*model.Block, if err != nil { return nil, err } - + bp.Logger.Warnf("mempool tx backup %d in total with block_height %d", len(mempoolsBackup), commonBlock.GetHeight()) derivedQueries := query.GetDerivedQuery(bp.ChainType) err = bp.QueryExecutor.BeginTx() if err != nil { @@ -117,11 +118,11 @@ func (bp *BlockPopper) PopOffToBlock(commonBlock *model.Block) ([]*model.Block, if err != nil { return nil, err } + /* mempoolsBackupBytes format is - [...{4}byteSize,[bytesSize]transactionBytes] + [...{4}byteSize,{bytesSize}transactionBytes] */ - // TODO: Need to restore the backups from badger while getting mempool transactions before PushBlock process sizeMempool := uint32(len(mempool.GetTransactionBytes())) mempoolsBackupBytes.Write(util.ConvertUint32ToBytes(sizeMempool)) mempoolsBackupBytes.Write(mempool.GetTransactionBytes()) diff --git a/core/blockchainsync/blockPopper_test.go b/core/blockchainsync/blockPopper_test.go index ad2fc101d..03e4fabc7 100644 --- a/core/blockchainsync/blockPopper_test.go +++ b/core/blockchainsync/blockPopper_test.go @@ -8,6 +8,7 @@ import ( "github.com/DATA-DOG/go-sqlmock" "github.com/golang/mock/gomock" + "github.com/sirupsen/logrus" "github.com/zoobc/zoobc-core/common/chaintype" "github.com/zoobc/zoobc-core/common/kvdb" "github.com/zoobc/zoobc-core/common/model" @@ -124,6 +125,7 @@ func TestService_PopOffToBlock(t *testing.T) { ChainType chaintype.ChainType ActionTypeSwitcher transaction.TypeActionSwitcher KVDB kvdb.KVExecutorInterface + Logger *logrus.Logger } type args struct { commonBlock *model.Block @@ -173,6 +175,7 @@ func TestService_PopOffToBlock(t *testing.T) { BlockService: &mockServiceBlockFailGetBlockByID{}, ChainType: &mockServiceChainType{}, QueryExecutor: &mockServiceQueryExecutor{}, + Logger: logrus.New(), }, args: args{ commonBlock: &model.Block{ @@ -204,6 +207,7 @@ func TestService_PopOffToBlock(t *testing.T) { nil, nil, ), + Logger: logrus.New(), }, args: args{ commonBlock: &model.Block{ @@ -234,6 +238,7 @@ func TestService_PopOffToBlock(t *testing.T) { nil, nil, ), + Logger: logrus.New(), }, args: args{ commonBlock: &model.Block{ @@ -266,6 +271,7 @@ func TestService_PopOffToBlock(t *testing.T) { ), ActionTypeSwitcher: &transaction.TypeSwitcher{Executor: &mockPopOffToBlockReturnCommonBlock{}}, KVDB: kvdb.NewMockKVExecutorInterface(gomock.NewController(t)), + Logger: logrus.New(), }, args: args{ commonBlock: &model.Block{ @@ -286,6 +292,7 @@ func TestService_PopOffToBlock(t *testing.T) { MempoolService: tt.fields.MempoolService, ActionTypeSwitcher: tt.fields.ActionTypeSwitcher, KVDB: tt.fields.KVDB, + Logger: tt.fields.Logger, } got, err := bp.PopOffToBlock(tt.args.commonBlock) if (err != nil) != tt.wantErr { diff --git a/core/blockchainsync/blockchainSync.go b/core/blockchainsync/blockchainSync.go index 2fe03e95e..178d2c98a 100644 --- a/core/blockchainsync/blockchainSync.go +++ b/core/blockchainsync/blockchainSync.go @@ -57,6 +57,7 @@ func NewBlockchainSyncService(blockService service.BlockServiceInterface, QueryExecutor: queryExecutor, ActionTypeSwitcher: txActionSwitcher, MempoolService: mempoolService, + Logger: logger, BlockPopper: &BlockPopper{ ChainType: blockService.GetChainType(), BlockService: blockService, @@ -64,6 +65,7 @@ func NewBlockchainSyncService(blockService service.BlockServiceInterface, QueryExecutor: queryExecutor, ActionTypeSwitcher: txActionSwitcher, KVDB: kvdb, + Logger: logger, }, }, Logger: logger, diff --git a/core/blockchainsync/processFork.go b/core/blockchainsync/processFork.go index 7405cd29c..e2702f43b 100644 --- a/core/blockchainsync/processFork.go +++ b/core/blockchainsync/processFork.go @@ -8,6 +8,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/zoobc/zoobc-core/common/blocker" "github.com/zoobc/zoobc-core/common/chaintype" + "github.com/zoobc/zoobc-core/common/constant" "github.com/zoobc/zoobc-core/common/model" "github.com/zoobc/zoobc-core/common/query" "github.com/zoobc/zoobc-core/common/transaction" @@ -26,15 +27,18 @@ type ( QueryExecutor query.ExecutorInterface ActionTypeSwitcher transaction.TypeActionSwitcher MempoolService service.MempoolServiceInterface + Logger *log.Logger } ) // ProcessFork processes the forked blocks func (fp *ForkingProcessor) ProcessFork(forkBlocks []*model.Block, commonBlock *model.Block, feederPeer *model.Peer) error { + var ( - err error - myPoppedOffBlocks, peerPoppedOffBlocks []*model.Block lastBlockBeforeProcess, lastBlock, currentLastBlock *model.Block + myPoppedOffBlocks, peerPoppedOffBlocks []*model.Block + pushedForkBlocks int + err error ) lastBlockBeforeProcess, err = fp.BlockService.GetLastBlock() @@ -47,8 +51,6 @@ func (fp *ForkingProcessor) ProcessFork(forkBlocks []*model.Block, commonBlock * return err } - pushedForkBlocks := 0 - lastBlock, err = fp.BlockService.GetLastBlock() if err != nil { return err @@ -70,13 +72,13 @@ func (fp *ForkingProcessor) ProcessFork(forkBlocks []*model.Block, commonBlock * if err != nil { // TODO: analyze the mechanism of blacklisting peer here // bd.P2pService.Blacklist(peer) - log.Warnf("[pushing fork block] failed to verify block %v from peer: %s\nwith previous: %v\n", block.ID, err, lastBlock.ID) + fp.Logger.Warnf("[pushing fork block] failed to verify block %v from peer: %s\nwith previous: %v\n", block.ID, err, lastBlock.ID) } err = fp.BlockService.PushBlock(lastBlock, block, false) if err != nil { // TODO: blacklist the wrong peer // fp.P2pService.Blacklist(feederPeer) - log.Warnf("\n\nPushBlock err %v\n\n", err) + fp.Logger.Warnf("\n\nPushBlock err %v\n\n", err) break } pushedForkBlocks++ @@ -107,7 +109,7 @@ func (fp *ForkingProcessor) ProcessFork(forkBlocks []*model.Block, commonBlock * // if no fork blocks successfully applied, go back to our fork // other wise, just take the transactions of our popped blocks to be processed later if pushedForkBlocks == 0 { - log.Println("Did not accept any blocks from peer, pushing back my blocks") + fp.Logger.Println("Did not accept any blocks from peer, pushing back my blocks") for _, block := range myPoppedOffBlocks { lastBlock, err = fp.BlockService.GetLastBlock() if err != nil { @@ -117,7 +119,7 @@ func (fp *ForkingProcessor) ProcessFork(forkBlocks []*model.Block, commonBlock * if err != nil { // TODO: analyze the mechanism of blacklisting peer here // bd.P2pService.Blacklist(peer) - log.Warnf("[pushing back own block] failed to verify block %v from peer: %s\n with previous: %v\n", block.ID, err, lastBlock.ID) + fp.Logger.Warnf("[pushing back own block] failed to verify block %v from peer: %s\n with previous: %v\n", block.ID, err, lastBlock.ID) return err } err = fp.BlockService.PushBlock(lastBlock, block, false) @@ -131,6 +133,11 @@ func (fp *ForkingProcessor) ProcessFork(forkBlocks []*model.Block, commonBlock * } } + // start restoring mempool from badgerDB + err = fp.restoreMempoolsBackup() + if err != nil { + fp.Logger.Errorf("RestoreBackupFail: %s", err.Error()) + } return nil } @@ -198,3 +205,80 @@ func (fp *ForkingProcessor) ProcessLater(txs []*model.Transaction) error { func (fp *ForkingProcessor) ScheduleScan(height uint32, validate bool) { // TODO: analyze if this mechanism is necessary } + +// restoreMempoolsBackup will restore transaction from badgerDB and try to re-ApplyUnconfirmed +func (fp *ForkingProcessor) restoreMempoolsBackup() error { + + var ( + mempoolsBackupBytes []byte + prev uint32 + err error + ) + + mempoolsBackupBytes, err = fp.BlockPopper.KVDB.Get(constant.KVDBMempoolsBackup) + if err != nil { + return err + } + + for int(prev) < len(mempoolsBackupBytes) { + var ( + transactionBytes []byte + mempoolTX *model.MempoolTransaction + txType transaction.TypeAction + tx *model.Transaction + size uint32 + ) + + prev += constant.TransactionBodyLength // initiate length of size + size = commonUtil.ConvertBytesToUint32(mempoolsBackupBytes[:prev]) + transactionBytes = mempoolsBackupBytes[prev:][:size] + prev += size + + tx, err = transaction.ParseTransactionBytes(transactionBytes, true) + if err != nil { + return err + } + mempoolTX = &model.MempoolTransaction{ + FeePerByte: commonUtil.FeePerByteTransaction(tx.GetFee(), transactionBytes), + ID: tx.ID, + TransactionBytes: transactionBytes, + ArrivalTimestamp: time.Now().Unix(), + SenderAccountAddress: tx.SenderAccountAddress, + RecipientAccountAddress: tx.RecipientAccountAddress, + } + err = fp.MempoolService.ValidateMempoolTransaction(mempoolTX) + if err != nil { + return err + } + + txType, err = fp.ActionTypeSwitcher.GetTransactionType(tx) + if err != nil { + return err + } + // Apply Unconfirmed + err = fp.QueryExecutor.BeginTx() + if err != nil { + return err + } + err = txType.ApplyUnconfirmed() + if err != nil { + err = fp.QueryExecutor.RollbackTx() + if err != nil { + return err + } + return err + } + err = fp.MempoolService.AddMempoolTransaction(mempoolTX) + if err != nil { + err = fp.QueryExecutor.RollbackTx() + if err != nil { + return err + } + } + err = fp.QueryExecutor.CommitTx() + if err != nil { + return err + } + } + return nil +} diff --git a/core/blockchainsync/processFork_test.go b/core/blockchainsync/processFork_test.go index b24971db8..992b070d4 100644 --- a/core/blockchainsync/processFork_test.go +++ b/core/blockchainsync/processFork_test.go @@ -15,10 +15,6 @@ type mockServiceBlockSuccess struct { service.BlockServiceInterface } -type mockServiceForkingProcessSuccess struct { - ForkingProcessorInterface -} - type mockServiceChainType struct { chaintype.ChainType } @@ -40,27 +36,10 @@ type mockServiceBlockFailGetBlockByID struct { service.BlockServiceInterface } -// FORKING SERVICE FAILS -type mockServiceForkingProcessFail struct { - ForkingProcessorInterface -} - -// SERVICE QUERY EXECUTOR FAILS -type mockServiceQueryExecutorBeginTXFail struct { - query.ExecutorInterface -} -type mockServiceQueryExecutorExecuteTransFail struct { - query.ExecutorInterface -} type mockServiceQueryExecutorCommitTXFail struct { query.ExecutorInterface } -// Function mock for Forking interface -func (*mockServiceForkingProcessSuccess) getMinRollbackHeight() (uint32, error) { - return 20, nil -} - // Mock function for Block interface func (*mockServiceBlockSuccess) GetLastBlock() (*model.Block, error) { return &model.Block{ID: 58, Height: 66}, nil @@ -209,55 +188,6 @@ func (*mockServiceBlockFailGetBlockByID) GetTransactionsByBlockID(blockID int64) return transaction, nil } -// FORKING PPROCESS SERVICE FAIL -func (*mockServiceForkingProcessFail) getMinRollbackHeight() (uint32, error) { - return 0, blocker.NewBlocker( - blocker.AuthErr, - "ERROR WHEN GETTING MINIMAL HEIGHT FOR ROLLBACK", - ) -} - -// QUERY EXECUTOR SERVICE FAILS -// BEGIN TX FUNC FAIL -func (*mockServiceQueryExecutorBeginTXFail) BeginTx() error { - return blocker.NewBlocker( - blocker.AuthErr, - "failed to begin TX", - ) -} - -func (*mockServiceQueryExecutorBeginTXFail) ExecuteTransaction(qStr string, args ...interface{}) error { - return nil -} - -func (*mockServiceQueryExecutorBeginTXFail) CommitTx() error { - return nil -} - -func (*mockServiceQueryExecutorBeginTXFail) RollbackTx() error { - return nil -} - -// EXECUTE TRANSACTION FAIL -func (*mockServiceQueryExecutorExecuteTransFail) BeginTx() error { - return nil -} - -func (*mockServiceQueryExecutorExecuteTransFail) ExecuteTransactions(queries [][]interface{}) error { - return blocker.NewBlocker( - blocker.AuthErr, - "failed to execute Transactions", - ) -} - -func (*mockServiceQueryExecutorExecuteTransFail) CommitTx() error { - return nil -} - -func (*mockServiceQueryExecutorExecuteTransFail) RollbackTx() error { - return nil -} - // COMMITX FAIL func (*mockServiceQueryExecutorCommitTXFail) BeginTx() error { return nil diff --git a/core/service/mempoolCoreService.go b/core/service/mempoolCoreService.go index 4202f16b0..785017441 100644 --- a/core/service/mempoolCoreService.go +++ b/core/service/mempoolCoreService.go @@ -132,7 +132,7 @@ func (mps *MempoolService) GetMempoolTransaction(id int64) (*model.MempoolTransa return nil, blocker.NewBlocker(blocker.DBRowNotFound, "MempoolTransactionNotFound") } -// AddMempoolTransaction validates and insert a transaction into the mempool +// AddMempoolTransaction validates and insert a transaction into the mempool and also set the BlockHeight as well func (mps *MempoolService) AddMempoolTransaction(mpTx *model.MempoolTransaction) error { // check maximum mempool