From c5372412840a542d464e0d9e8718bd407bbb8190 Mon Sep 17 00:00:00 2001 From: astaphobia Date: Fri, 8 Nov 2019 14:39:45 +0800 Subject: [PATCH 1/4] initiate restoreMempoolsBackup func --- core/blockchainsync/processFork.go | 84 ++++++++++++++++++++++++++++-- core/service/mempoolCoreService.go | 2 +- 2 files changed, 81 insertions(+), 5 deletions(-) diff --git a/core/blockchainsync/processFork.go b/core/blockchainsync/processFork.go index f594094fa..8ade25e31 100644 --- a/core/blockchainsync/processFork.go +++ b/core/blockchainsync/processFork.go @@ -8,6 +8,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/zoobc/zoobc-core/common/blocker" "github.com/zoobc/zoobc-core/common/chaintype" + "github.com/zoobc/zoobc-core/common/constant" "github.com/zoobc/zoobc-core/common/model" "github.com/zoobc/zoobc-core/common/query" "github.com/zoobc/zoobc-core/common/transaction" @@ -32,9 +33,10 @@ type ( // ProcessFork processes the forked blocks func (fp *ForkingProcessor) ProcessFork(forkBlocks []*model.Block, commonBlock *model.Block, feederPeer *model.Peer) error { var ( - err error - myPoppedOffBlocks, peerPoppedOffBlocks []*model.Block lastBlockBeforeProcess, lastBlock, currentLastBlock *model.Block + myPoppedOffBlocks, peerPoppedOffBlocks []*model.Block + pushedForkBlocks int + err error ) lastBlockBeforeProcess, err = fp.BlockService.GetLastBlock() @@ -47,8 +49,6 @@ func (fp *ForkingProcessor) ProcessFork(forkBlocks []*model.Block, commonBlock * return err } - pushedForkBlocks := 0 - lastBlock, err = fp.BlockService.GetLastBlock() if err != nil { return err @@ -198,3 +198,79 @@ func (fp *ForkingProcessor) ProcessLater(txs []*model.Transaction) error { func (fp *ForkingProcessor) ScheduleScan(height uint32, validate bool) { // TODO: analyze if this mechanism is necessary } + +// restoreMempoolsBackup will restore transaction from badgerDB and try to re-ApplyUnconfirmed +func (fp *ForkingProcessor) restoreMempoolsBackup() error { + + var ( + mempoolsBackupBytes []byte + nextLength uint32 + err error + ) + + mempoolsBackupBytes, err = fp.BlockPopper.KVDB.Get(constant.KVDBMempoolsBackup) + if err != nil { + return err + } + + for { + var ( + mempoolTX *model.MempoolTransaction + txType transaction.TypeAction + tx *model.Transaction + transactionSize uint32 + transactionBytes []byte + ) + + transactionSize = commonUtil.ConvertBytesToUint32(mempoolsBackupBytes[:4]) + nextLength += transactionSize + 4 + transactionBytes = mempoolsBackupBytes[:nextLength] + + tx, err = transaction.ParseTransactionBytes(transactionBytes, true) + if err != nil { + return err + } + mempoolTX = &model.MempoolTransaction{ + FeePerByte: commonUtil.FeePerByteTransaction(tx.GetFee(), transactionBytes), + ID: tx.ID, + TransactionBytes: transactionBytes, + ArrivalTimestamp: time.Now().Unix(), + SenderAccountAddress: tx.SenderAccountAddress, + RecipientAccountAddress: tx.RecipientAccountAddress, + } + err = fp.MempoolService.ValidateMempoolTransaction(mempoolTX) + if err != nil { + return err + } + + txType, err = fp.ActionTypeSwitcher.GetTransactionType(tx) + if err != nil { + return err + } + // Apply Unconfirmed + err = fp.QueryExecutor.BeginTx() + if err != nil { + return err + } + err = txType.ApplyUnconfirmed() + if err != nil { + errRollback := fp.QueryExecutor.RollbackTx() + if errRollback != nil { + return errRollback + } + return err + } + err = fp.MempoolService.AddMempoolTransaction(mempoolTX) + if err != nil { + errRollback := fp.QueryExecutor.RollbackTx() + if errRollback != nil { + return err + } + return err + } + err = fp.QueryExecutor.CommitTx() + if err != nil { + return err + } + } +} diff --git a/core/service/mempoolCoreService.go b/core/service/mempoolCoreService.go index 4202f16b0..785017441 100644 --- a/core/service/mempoolCoreService.go +++ b/core/service/mempoolCoreService.go @@ -132,7 +132,7 @@ func (mps *MempoolService) GetMempoolTransaction(id int64) (*model.MempoolTransa return nil, blocker.NewBlocker(blocker.DBRowNotFound, "MempoolTransactionNotFound") } -// AddMempoolTransaction validates and insert a transaction into the mempool +// AddMempoolTransaction validates and insert a transaction into the mempool and also set the BlockHeight as well func (mps *MempoolService) AddMempoolTransaction(mpTx *model.MempoolTransaction) error { // check maximum mempool From cfa0aa8c921bb312e96ff822fd7434f59c818b30 Mon Sep 17 00:00:00 2001 From: astaphobia Date: Sun, 10 Nov 2019 02:50:04 +0800 Subject: [PATCH 2/4] finished mempool backup process and add loggger into processfork struct --- core/blockchainsync/blockPopper.go | 6 +-- core/blockchainsync/blockchainSync.go | 1 + core/blockchainsync/processFork.go | 31 +++++++---- core/blockchainsync/processFork_test.go | 70 ------------------------- 4 files changed, 24 insertions(+), 84 deletions(-) diff --git a/core/blockchainsync/blockPopper.go b/core/blockchainsync/blockPopper.go index b7112ebb1..29c70d736 100644 --- a/core/blockchainsync/blockPopper.go +++ b/core/blockchainsync/blockPopper.go @@ -41,7 +41,7 @@ func (bp *BlockPopper) PopOffToBlock(commonBlock *model.Block) ([]*model.Block, minRollbackHeight := getMinRollbackHeight(lastBlock.Height) if commonBlock.Height < minRollbackHeight { - // TODO: handle it appropriately and analyze the effect if this returning empty element in the further processfork pocess + // TODO: handle it appropriately and analyze the effect if this returning empty element in the further processfork process log.Warn("the node blockchain detects hardfork, please manually delete the database to recover") return []*model.Block{}, nil } @@ -73,7 +73,7 @@ func (bp *BlockPopper) PopOffToBlock(commonBlock *model.Block) ([]*model.Block, if err != nil { return nil, err } - + log.Warnf("mempool tx backup %d in total with block_height %d", len(mempoolsBackup), commonBlock.GetHeight()) derivedQueries := query.GetDerivedQuery(bp.ChainType) err = bp.QueryExecutor.BeginTx() if err != nil { @@ -119,7 +119,7 @@ func (bp *BlockPopper) PopOffToBlock(commonBlock *model.Block) ([]*model.Block, } /* mempoolsBackupBytes format is - [...{4}byteSize,[bytesSize]transactionBytes] + [...{4}byteSize,{bytesSize}transactionBytes] */ // TODO: Need to restore the backups from badger while getting mempool transactions before PushBlock process sizeMempool := uint32(len(mempool.GetTransactionBytes())) diff --git a/core/blockchainsync/blockchainSync.go b/core/blockchainsync/blockchainSync.go index c687222ac..92ad89657 100644 --- a/core/blockchainsync/blockchainSync.go +++ b/core/blockchainsync/blockchainSync.go @@ -57,6 +57,7 @@ func NewBlockchainSyncService(blockService service.BlockServiceInterface, QueryExecutor: queryExecutor, ActionTypeSwitcher: txActionSwitcher, MempoolService: mempoolService, + Logger: logger, BlockPopper: &BlockPopper{ ChainType: blockService.GetChainType(), BlockService: blockService, diff --git a/core/blockchainsync/processFork.go b/core/blockchainsync/processFork.go index 8ade25e31..165f3b102 100644 --- a/core/blockchainsync/processFork.go +++ b/core/blockchainsync/processFork.go @@ -27,11 +27,13 @@ type ( QueryExecutor query.ExecutorInterface ActionTypeSwitcher transaction.TypeActionSwitcher MempoolService service.MempoolServiceInterface + Logger *log.Logger } ) // ProcessFork processes the forked blocks func (fp *ForkingProcessor) ProcessFork(forkBlocks []*model.Block, commonBlock *model.Block, feederPeer *model.Peer) error { + var ( lastBlockBeforeProcess, lastBlock, currentLastBlock *model.Block myPoppedOffBlocks, peerPoppedOffBlocks []*model.Block @@ -70,13 +72,13 @@ func (fp *ForkingProcessor) ProcessFork(forkBlocks []*model.Block, commonBlock * if err != nil { // TODO: analyze the mechanism of blacklisting peer here // bd.P2pService.Blacklist(peer) - log.Warnf("[pushing fork block] failed to verify block %v from peer: %s\nwith previous: %v\n", block.ID, err, lastBlock.ID) + fp.Logger.Warnf("[pushing fork block] failed to verify block %v from peer: %s\nwith previous: %v\n", block.ID, err, lastBlock.ID) } err = fp.BlockService.PushBlock(lastBlock, block, false, false) if err != nil { // TODO: blacklist the wrong peer // fp.P2pService.Blacklist(feederPeer) - log.Warnf("\n\nPushBlock err %v\n\n", err) + fp.Logger.Warnf("\n\nPushBlock err %v\n\n", err) break } pushedForkBlocks++ @@ -107,7 +109,7 @@ func (fp *ForkingProcessor) ProcessFork(forkBlocks []*model.Block, commonBlock * // if no fork blocks successfully applied, go back to our fork // other wise, just take the transactions of our popped blocks to be processed later if pushedForkBlocks == 0 { - log.Println("Did not accept any blocks from peer, pushing back my blocks") + fp.Logger.Println("Did not accept any blocks from peer, pushing back my blocks") for _, block := range myPoppedOffBlocks { lastBlock, err = fp.BlockService.GetLastBlock() if err != nil { @@ -117,7 +119,7 @@ func (fp *ForkingProcessor) ProcessFork(forkBlocks []*model.Block, commonBlock * if err != nil { // TODO: analyze the mechanism of blacklisting peer here // bd.P2pService.Blacklist(peer) - log.Warnf("[pushing back own block] failed to verify block %v from peer: %s\n with previous: %v\n", block.ID, err, lastBlock.ID) + fp.Logger.Warnf("[pushing back own block] failed to verify block %v from peer: %s\n with previous: %v\n", block.ID, err, lastBlock.ID) return err } err = fp.BlockService.PushBlock(lastBlock, block, false, false) @@ -131,6 +133,11 @@ func (fp *ForkingProcessor) ProcessFork(forkBlocks []*model.Block, commonBlock * } } + // start restoring mempool from badgerDB + err = fp.restoreMempoolsBackup() + if err != nil { + fp.Logger.Errorf("RestoreBackupFail: %s", err.Error()) + } return nil } @@ -204,7 +211,7 @@ func (fp *ForkingProcessor) restoreMempoolsBackup() error { var ( mempoolsBackupBytes []byte - nextLength uint32 + prev uint32 err error ) @@ -213,18 +220,19 @@ func (fp *ForkingProcessor) restoreMempoolsBackup() error { return err } - for { + for int(prev) < len(mempoolsBackupBytes) { var ( + transactionBytes []byte mempoolTX *model.MempoolTransaction txType transaction.TypeAction tx *model.Transaction - transactionSize uint32 - transactionBytes []byte + size uint32 ) - transactionSize = commonUtil.ConvertBytesToUint32(mempoolsBackupBytes[:4]) - nextLength += transactionSize + 4 - transactionBytes = mempoolsBackupBytes[:nextLength] + prev += 4 // initiate length of size + size = commonUtil.ConvertBytesToUint32(mempoolsBackupBytes[:prev]) + transactionBytes = mempoolsBackupBytes[prev:][:size] + prev += size tx, err = transaction.ParseTransactionBytes(transactionBytes, true) if err != nil { @@ -273,4 +281,5 @@ func (fp *ForkingProcessor) restoreMempoolsBackup() error { return err } } + return nil } diff --git a/core/blockchainsync/processFork_test.go b/core/blockchainsync/processFork_test.go index b24971db8..992b070d4 100644 --- a/core/blockchainsync/processFork_test.go +++ b/core/blockchainsync/processFork_test.go @@ -15,10 +15,6 @@ type mockServiceBlockSuccess struct { service.BlockServiceInterface } -type mockServiceForkingProcessSuccess struct { - ForkingProcessorInterface -} - type mockServiceChainType struct { chaintype.ChainType } @@ -40,27 +36,10 @@ type mockServiceBlockFailGetBlockByID struct { service.BlockServiceInterface } -// FORKING SERVICE FAILS -type mockServiceForkingProcessFail struct { - ForkingProcessorInterface -} - -// SERVICE QUERY EXECUTOR FAILS -type mockServiceQueryExecutorBeginTXFail struct { - query.ExecutorInterface -} -type mockServiceQueryExecutorExecuteTransFail struct { - query.ExecutorInterface -} type mockServiceQueryExecutorCommitTXFail struct { query.ExecutorInterface } -// Function mock for Forking interface -func (*mockServiceForkingProcessSuccess) getMinRollbackHeight() (uint32, error) { - return 20, nil -} - // Mock function for Block interface func (*mockServiceBlockSuccess) GetLastBlock() (*model.Block, error) { return &model.Block{ID: 58, Height: 66}, nil @@ -209,55 +188,6 @@ func (*mockServiceBlockFailGetBlockByID) GetTransactionsByBlockID(blockID int64) return transaction, nil } -// FORKING PPROCESS SERVICE FAIL -func (*mockServiceForkingProcessFail) getMinRollbackHeight() (uint32, error) { - return 0, blocker.NewBlocker( - blocker.AuthErr, - "ERROR WHEN GETTING MINIMAL HEIGHT FOR ROLLBACK", - ) -} - -// QUERY EXECUTOR SERVICE FAILS -// BEGIN TX FUNC FAIL -func (*mockServiceQueryExecutorBeginTXFail) BeginTx() error { - return blocker.NewBlocker( - blocker.AuthErr, - "failed to begin TX", - ) -} - -func (*mockServiceQueryExecutorBeginTXFail) ExecuteTransaction(qStr string, args ...interface{}) error { - return nil -} - -func (*mockServiceQueryExecutorBeginTXFail) CommitTx() error { - return nil -} - -func (*mockServiceQueryExecutorBeginTXFail) RollbackTx() error { - return nil -} - -// EXECUTE TRANSACTION FAIL -func (*mockServiceQueryExecutorExecuteTransFail) BeginTx() error { - return nil -} - -func (*mockServiceQueryExecutorExecuteTransFail) ExecuteTransactions(queries [][]interface{}) error { - return blocker.NewBlocker( - blocker.AuthErr, - "failed to execute Transactions", - ) -} - -func (*mockServiceQueryExecutorExecuteTransFail) CommitTx() error { - return nil -} - -func (*mockServiceQueryExecutorExecuteTransFail) RollbackTx() error { - return nil -} - // COMMITX FAIL func (*mockServiceQueryExecutorCommitTXFail) BeginTx() error { return nil From 208252ace7b504405995d11bab66405ae3607dfd Mon Sep 17 00:00:00 2001 From: astaphobia Date: Sun, 10 Nov 2019 02:54:15 +0800 Subject: [PATCH 3/4] fetch latest schema/origin/develop --- common/schema | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/schema b/common/schema index 463e665f7..29a43f883 160000 --- a/common/schema +++ b/common/schema @@ -1 +1 @@ -Subproject commit 463e665f79c49f9e250d10f9fb69c49401c82788 +Subproject commit 29a43f8830e8edc78de40fb81d16c6791845386e From e0625e386a9c8efb6ef06736848a72c601120eb3 Mon Sep 17 00:00:00 2001 From: astaphobia Date: Thu, 14 Nov 2019 14:02:41 +0800 Subject: [PATCH 4/4] add logger into blockPopper stuff --- core/blockchainsync/blockPopper.go | 7 ++++--- core/blockchainsync/blockPopper_test.go | 7 +++++++ core/blockchainsync/blockchainSync.go | 1 + core/blockchainsync/processFork.go | 13 ++++++------- 4 files changed, 18 insertions(+), 10 deletions(-) diff --git a/core/blockchainsync/blockPopper.go b/core/blockchainsync/blockPopper.go index 29c70d736..708262ca8 100644 --- a/core/blockchainsync/blockPopper.go +++ b/core/blockchainsync/blockPopper.go @@ -23,6 +23,7 @@ type BlockPopper struct { ChainType chaintype.ChainType ActionTypeSwitcher transaction.TypeActionSwitcher KVDB kvdb.KVExecutorInterface + Logger *log.Logger } // PopOffToBlock will remove the block in current Chain until commonBlock is reached @@ -42,7 +43,7 @@ func (bp *BlockPopper) PopOffToBlock(commonBlock *model.Block) ([]*model.Block, if commonBlock.Height < minRollbackHeight { // TODO: handle it appropriately and analyze the effect if this returning empty element in the further processfork process - log.Warn("the node blockchain detects hardfork, please manually delete the database to recover") + bp.Logger.Warn("the node blockchain detects hardfork, please manually delete the database to recover") return []*model.Block{}, nil } @@ -73,7 +74,7 @@ func (bp *BlockPopper) PopOffToBlock(commonBlock *model.Block) ([]*model.Block, if err != nil { return nil, err } - log.Warnf("mempool tx backup %d in total with block_height %d", len(mempoolsBackup), commonBlock.GetHeight()) + bp.Logger.Warnf("mempool tx backup %d in total with block_height %d", len(mempoolsBackup), commonBlock.GetHeight()) derivedQueries := query.GetDerivedQuery(bp.ChainType) err = bp.QueryExecutor.BeginTx() if err != nil { @@ -117,11 +118,11 @@ func (bp *BlockPopper) PopOffToBlock(commonBlock *model.Block) ([]*model.Block, if err != nil { return nil, err } + /* mempoolsBackupBytes format is [...{4}byteSize,{bytesSize}transactionBytes] */ - // TODO: Need to restore the backups from badger while getting mempool transactions before PushBlock process sizeMempool := uint32(len(mempool.GetTransactionBytes())) mempoolsBackupBytes.Write(util.ConvertUint32ToBytes(sizeMempool)) mempoolsBackupBytes.Write(mempool.GetTransactionBytes()) diff --git a/core/blockchainsync/blockPopper_test.go b/core/blockchainsync/blockPopper_test.go index ad2fc101d..03e4fabc7 100644 --- a/core/blockchainsync/blockPopper_test.go +++ b/core/blockchainsync/blockPopper_test.go @@ -8,6 +8,7 @@ import ( "github.com/DATA-DOG/go-sqlmock" "github.com/golang/mock/gomock" + "github.com/sirupsen/logrus" "github.com/zoobc/zoobc-core/common/chaintype" "github.com/zoobc/zoobc-core/common/kvdb" "github.com/zoobc/zoobc-core/common/model" @@ -124,6 +125,7 @@ func TestService_PopOffToBlock(t *testing.T) { ChainType chaintype.ChainType ActionTypeSwitcher transaction.TypeActionSwitcher KVDB kvdb.KVExecutorInterface + Logger *logrus.Logger } type args struct { commonBlock *model.Block @@ -173,6 +175,7 @@ func TestService_PopOffToBlock(t *testing.T) { BlockService: &mockServiceBlockFailGetBlockByID{}, ChainType: &mockServiceChainType{}, QueryExecutor: &mockServiceQueryExecutor{}, + Logger: logrus.New(), }, args: args{ commonBlock: &model.Block{ @@ -204,6 +207,7 @@ func TestService_PopOffToBlock(t *testing.T) { nil, nil, ), + Logger: logrus.New(), }, args: args{ commonBlock: &model.Block{ @@ -234,6 +238,7 @@ func TestService_PopOffToBlock(t *testing.T) { nil, nil, ), + Logger: logrus.New(), }, args: args{ commonBlock: &model.Block{ @@ -266,6 +271,7 @@ func TestService_PopOffToBlock(t *testing.T) { ), ActionTypeSwitcher: &transaction.TypeSwitcher{Executor: &mockPopOffToBlockReturnCommonBlock{}}, KVDB: kvdb.NewMockKVExecutorInterface(gomock.NewController(t)), + Logger: logrus.New(), }, args: args{ commonBlock: &model.Block{ @@ -286,6 +292,7 @@ func TestService_PopOffToBlock(t *testing.T) { MempoolService: tt.fields.MempoolService, ActionTypeSwitcher: tt.fields.ActionTypeSwitcher, KVDB: tt.fields.KVDB, + Logger: tt.fields.Logger, } got, err := bp.PopOffToBlock(tt.args.commonBlock) if (err != nil) != tt.wantErr { diff --git a/core/blockchainsync/blockchainSync.go b/core/blockchainsync/blockchainSync.go index de28983a7..178d2c98a 100644 --- a/core/blockchainsync/blockchainSync.go +++ b/core/blockchainsync/blockchainSync.go @@ -65,6 +65,7 @@ func NewBlockchainSyncService(blockService service.BlockServiceInterface, QueryExecutor: queryExecutor, ActionTypeSwitcher: txActionSwitcher, KVDB: kvdb, + Logger: logger, }, }, Logger: logger, diff --git a/core/blockchainsync/processFork.go b/core/blockchainsync/processFork.go index e65a348db..e2702f43b 100644 --- a/core/blockchainsync/processFork.go +++ b/core/blockchainsync/processFork.go @@ -229,7 +229,7 @@ func (fp *ForkingProcessor) restoreMempoolsBackup() error { size uint32 ) - prev += 4 // initiate length of size + prev += constant.TransactionBodyLength // initiate length of size size = commonUtil.ConvertBytesToUint32(mempoolsBackupBytes[:prev]) transactionBytes = mempoolsBackupBytes[prev:][:size] prev += size @@ -262,19 +262,18 @@ func (fp *ForkingProcessor) restoreMempoolsBackup() error { } err = txType.ApplyUnconfirmed() if err != nil { - errRollback := fp.QueryExecutor.RollbackTx() - if errRollback != nil { - return errRollback + err = fp.QueryExecutor.RollbackTx() + if err != nil { + return err } return err } err = fp.MempoolService.AddMempoolTransaction(mempoolTX) if err != nil { - errRollback := fp.QueryExecutor.RollbackTx() - if errRollback != nil { + err = fp.QueryExecutor.RollbackTx() + if err != nil { return err } - return err } err = fp.QueryExecutor.CommitTx() if err != nil {