Skip to content

Rollback mempool - Restoring mempool transactions backup #419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Nov 15, 2019
11 changes: 6 additions & 5 deletions core/blockchainsync/blockPopper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type BlockPopper struct {
ChainType chaintype.ChainType
ActionTypeSwitcher transaction.TypeActionSwitcher
KVDB kvdb.KVExecutorInterface
Logger *log.Logger
}

// PopOffToBlock will remove the block in current Chain until commonBlock is reached
Expand All @@ -41,8 +42,8 @@ func (bp *BlockPopper) PopOffToBlock(commonBlock *model.Block) ([]*model.Block,
minRollbackHeight := getMinRollbackHeight(lastBlock.Height)

if commonBlock.Height < minRollbackHeight {
// TODO: handle it appropriately and analyze the effect if this returning empty element in the further processfork pocess
log.Warn("the node blockchain detects hardfork, please manually delete the database to recover")
// TODO: handle it appropriately and analyze the effect if this returning empty element in the further processfork process
bp.Logger.Warn("the node blockchain detects hardfork, please manually delete the database to recover")
return []*model.Block{}, nil
}

Expand Down Expand Up @@ -73,7 +74,7 @@ func (bp *BlockPopper) PopOffToBlock(commonBlock *model.Block) ([]*model.Block,
if err != nil {
return nil, err
}

bp.Logger.Warnf("mempool tx backup %d in total with block_height %d", len(mempoolsBackup), commonBlock.GetHeight())
derivedQueries := query.GetDerivedQuery(bp.ChainType)
err = bp.QueryExecutor.BeginTx()
if err != nil {
Expand Down Expand Up @@ -117,11 +118,11 @@ func (bp *BlockPopper) PopOffToBlock(commonBlock *model.Block) ([]*model.Block,
if err != nil {
return nil, err
}

/*
mempoolsBackupBytes format is
[...{4}byteSize,[bytesSize]transactionBytes]
[...{4}byteSize,{bytesSize}transactionBytes]
*/
// TODO: Need to restore the backups from badger while getting mempool transactions before PushBlock process
sizeMempool := uint32(len(mempool.GetTransactionBytes()))
mempoolsBackupBytes.Write(util.ConvertUint32ToBytes(sizeMempool))
mempoolsBackupBytes.Write(mempool.GetTransactionBytes())
Expand Down
7 changes: 7 additions & 0 deletions core/blockchainsync/blockPopper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/DATA-DOG/go-sqlmock"
"github.com/golang/mock/gomock"
"github.com/sirupsen/logrus"
"github.com/zoobc/zoobc-core/common/chaintype"
"github.com/zoobc/zoobc-core/common/kvdb"
"github.com/zoobc/zoobc-core/common/model"
Expand Down Expand Up @@ -124,6 +125,7 @@ func TestService_PopOffToBlock(t *testing.T) {
ChainType chaintype.ChainType
ActionTypeSwitcher transaction.TypeActionSwitcher
KVDB kvdb.KVExecutorInterface
Logger *logrus.Logger
}
type args struct {
commonBlock *model.Block
Expand Down Expand Up @@ -173,6 +175,7 @@ func TestService_PopOffToBlock(t *testing.T) {
BlockService: &mockServiceBlockFailGetBlockByID{},
ChainType: &mockServiceChainType{},
QueryExecutor: &mockServiceQueryExecutor{},
Logger: logrus.New(),
},
args: args{
commonBlock: &model.Block{
Expand Down Expand Up @@ -204,6 +207,7 @@ func TestService_PopOffToBlock(t *testing.T) {
nil,
nil,
),
Logger: logrus.New(),
},
args: args{
commonBlock: &model.Block{
Expand Down Expand Up @@ -234,6 +238,7 @@ func TestService_PopOffToBlock(t *testing.T) {
nil,
nil,
),
Logger: logrus.New(),
},
args: args{
commonBlock: &model.Block{
Expand Down Expand Up @@ -266,6 +271,7 @@ func TestService_PopOffToBlock(t *testing.T) {
),
ActionTypeSwitcher: &transaction.TypeSwitcher{Executor: &mockPopOffToBlockReturnCommonBlock{}},
KVDB: kvdb.NewMockKVExecutorInterface(gomock.NewController(t)),
Logger: logrus.New(),
},
args: args{
commonBlock: &model.Block{
Expand All @@ -286,6 +292,7 @@ func TestService_PopOffToBlock(t *testing.T) {
MempoolService: tt.fields.MempoolService,
ActionTypeSwitcher: tt.fields.ActionTypeSwitcher,
KVDB: tt.fields.KVDB,
Logger: tt.fields.Logger,
}
got, err := bp.PopOffToBlock(tt.args.commonBlock)
if (err != nil) != tt.wantErr {
Expand Down
2 changes: 2 additions & 0 deletions core/blockchainsync/blockchainSync.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ func NewBlockchainSyncService(blockService service.BlockServiceInterface,
QueryExecutor: queryExecutor,
ActionTypeSwitcher: txActionSwitcher,
MempoolService: mempoolService,
Logger: logger,
BlockPopper: &BlockPopper{
ChainType: blockService.GetChainType(),
BlockService: blockService,
MempoolService: mempoolService,
QueryExecutor: queryExecutor,
ActionTypeSwitcher: txActionSwitcher,
KVDB: kvdb,
Logger: logger,
},
},
Logger: logger,
Expand Down
100 changes: 92 additions & 8 deletions core/blockchainsync/processFork.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/zoobc/zoobc-core/common/blocker"
"github.com/zoobc/zoobc-core/common/chaintype"
"github.com/zoobc/zoobc-core/common/constant"
"github.com/zoobc/zoobc-core/common/model"
"github.com/zoobc/zoobc-core/common/query"
"github.com/zoobc/zoobc-core/common/transaction"
Expand All @@ -26,15 +27,18 @@ type (
QueryExecutor query.ExecutorInterface
ActionTypeSwitcher transaction.TypeActionSwitcher
MempoolService service.MempoolServiceInterface
Logger *log.Logger
}
)

// ProcessFork processes the forked blocks
func (fp *ForkingProcessor) ProcessFork(forkBlocks []*model.Block, commonBlock *model.Block, feederPeer *model.Peer) error {

var (
err error
myPoppedOffBlocks, peerPoppedOffBlocks []*model.Block
lastBlockBeforeProcess, lastBlock, currentLastBlock *model.Block
myPoppedOffBlocks, peerPoppedOffBlocks []*model.Block
pushedForkBlocks int
err error
)

lastBlockBeforeProcess, err = fp.BlockService.GetLastBlock()
Expand All @@ -47,8 +51,6 @@ func (fp *ForkingProcessor) ProcessFork(forkBlocks []*model.Block, commonBlock *
return err
}

pushedForkBlocks := 0

lastBlock, err = fp.BlockService.GetLastBlock()
if err != nil {
return err
Expand All @@ -70,13 +72,13 @@ func (fp *ForkingProcessor) ProcessFork(forkBlocks []*model.Block, commonBlock *
if err != nil {
// TODO: analyze the mechanism of blacklisting peer here
// bd.P2pService.Blacklist(peer)
log.Warnf("[pushing fork block] failed to verify block %v from peer: %s\nwith previous: %v\n", block.ID, err, lastBlock.ID)
fp.Logger.Warnf("[pushing fork block] failed to verify block %v from peer: %s\nwith previous: %v\n", block.ID, err, lastBlock.ID)
}
err = fp.BlockService.PushBlock(lastBlock, block, false)
if err != nil {
// TODO: blacklist the wrong peer
// fp.P2pService.Blacklist(feederPeer)
log.Warnf("\n\nPushBlock err %v\n\n", err)
fp.Logger.Warnf("\n\nPushBlock err %v\n\n", err)
break
}
pushedForkBlocks++
Expand Down Expand Up @@ -107,7 +109,7 @@ func (fp *ForkingProcessor) ProcessFork(forkBlocks []*model.Block, commonBlock *
// if no fork blocks successfully applied, go back to our fork
// other wise, just take the transactions of our popped blocks to be processed later
if pushedForkBlocks == 0 {
log.Println("Did not accept any blocks from peer, pushing back my blocks")
fp.Logger.Println("Did not accept any blocks from peer, pushing back my blocks")
for _, block := range myPoppedOffBlocks {
lastBlock, err = fp.BlockService.GetLastBlock()
if err != nil {
Expand All @@ -117,7 +119,7 @@ func (fp *ForkingProcessor) ProcessFork(forkBlocks []*model.Block, commonBlock *
if err != nil {
// TODO: analyze the mechanism of blacklisting peer here
// bd.P2pService.Blacklist(peer)
log.Warnf("[pushing back own block] failed to verify block %v from peer: %s\n with previous: %v\n", block.ID, err, lastBlock.ID)
fp.Logger.Warnf("[pushing back own block] failed to verify block %v from peer: %s\n with previous: %v\n", block.ID, err, lastBlock.ID)
return err
}
err = fp.BlockService.PushBlock(lastBlock, block, false)
Expand All @@ -131,6 +133,11 @@ func (fp *ForkingProcessor) ProcessFork(forkBlocks []*model.Block, commonBlock *
}
}

// start restoring mempool from badgerDB
err = fp.restoreMempoolsBackup()
if err != nil {
fp.Logger.Errorf("RestoreBackupFail: %s", err.Error())
}
return nil
}

Expand Down Expand Up @@ -198,3 +205,80 @@ func (fp *ForkingProcessor) ProcessLater(txs []*model.Transaction) error {
func (fp *ForkingProcessor) ScheduleScan(height uint32, validate bool) {
// TODO: analyze if this mechanism is necessary
}

// restoreMempoolsBackup will restore transaction from badgerDB and try to re-ApplyUnconfirmed
func (fp *ForkingProcessor) restoreMempoolsBackup() error {

var (
mempoolsBackupBytes []byte
prev uint32
err error
)

mempoolsBackupBytes, err = fp.BlockPopper.KVDB.Get(constant.KVDBMempoolsBackup)
if err != nil {
return err
}

for int(prev) < len(mempoolsBackupBytes) {
var (
transactionBytes []byte
mempoolTX *model.MempoolTransaction
txType transaction.TypeAction
tx *model.Transaction
size uint32
)

prev += constant.TransactionBodyLength // initiate length of size
size = commonUtil.ConvertBytesToUint32(mempoolsBackupBytes[:prev])
transactionBytes = mempoolsBackupBytes[prev:][:size]
prev += size

tx, err = transaction.ParseTransactionBytes(transactionBytes, true)
if err != nil {
return err
}
mempoolTX = &model.MempoolTransaction{
FeePerByte: commonUtil.FeePerByteTransaction(tx.GetFee(), transactionBytes),
ID: tx.ID,
TransactionBytes: transactionBytes,
ArrivalTimestamp: time.Now().Unix(),
SenderAccountAddress: tx.SenderAccountAddress,
RecipientAccountAddress: tx.RecipientAccountAddress,
}
err = fp.MempoolService.ValidateMempoolTransaction(mempoolTX)
if err != nil {
return err
}

txType, err = fp.ActionTypeSwitcher.GetTransactionType(tx)
if err != nil {
return err
}
// Apply Unconfirmed
err = fp.QueryExecutor.BeginTx()
if err != nil {
return err
}
err = txType.ApplyUnconfirmed()
if err != nil {
err = fp.QueryExecutor.RollbackTx()
if err != nil {
return err
}
return err
}
err = fp.MempoolService.AddMempoolTransaction(mempoolTX)
if err != nil {
err = fp.QueryExecutor.RollbackTx()
if err != nil {
return err
}
}
err = fp.QueryExecutor.CommitTx()
if err != nil {
return err
}
}
return nil
}
70 changes: 0 additions & 70 deletions core/blockchainsync/processFork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ type mockServiceBlockSuccess struct {
service.BlockServiceInterface
}

type mockServiceForkingProcessSuccess struct {
ForkingProcessorInterface
}

type mockServiceChainType struct {
chaintype.ChainType
}
Expand All @@ -40,27 +36,10 @@ type mockServiceBlockFailGetBlockByID struct {
service.BlockServiceInterface
}

// FORKING SERVICE FAILS
type mockServiceForkingProcessFail struct {
ForkingProcessorInterface
}

// SERVICE QUERY EXECUTOR FAILS
type mockServiceQueryExecutorBeginTXFail struct {
query.ExecutorInterface
}
type mockServiceQueryExecutorExecuteTransFail struct {
query.ExecutorInterface
}
type mockServiceQueryExecutorCommitTXFail struct {
query.ExecutorInterface
}

// Function mock for Forking interface
func (*mockServiceForkingProcessSuccess) getMinRollbackHeight() (uint32, error) {
return 20, nil
}

// Mock function for Block interface
func (*mockServiceBlockSuccess) GetLastBlock() (*model.Block, error) {
return &model.Block{ID: 58, Height: 66}, nil
Expand Down Expand Up @@ -209,55 +188,6 @@ func (*mockServiceBlockFailGetBlockByID) GetTransactionsByBlockID(blockID int64)
return transaction, nil
}

// FORKING PPROCESS SERVICE FAIL
func (*mockServiceForkingProcessFail) getMinRollbackHeight() (uint32, error) {
return 0, blocker.NewBlocker(
blocker.AuthErr,
"ERROR WHEN GETTING MINIMAL HEIGHT FOR ROLLBACK",
)
}

// QUERY EXECUTOR SERVICE FAILS
// BEGIN TX FUNC FAIL
func (*mockServiceQueryExecutorBeginTXFail) BeginTx() error {
return blocker.NewBlocker(
blocker.AuthErr,
"failed to begin TX",
)
}

func (*mockServiceQueryExecutorBeginTXFail) ExecuteTransaction(qStr string, args ...interface{}) error {
return nil
}

func (*mockServiceQueryExecutorBeginTXFail) CommitTx() error {
return nil
}

func (*mockServiceQueryExecutorBeginTXFail) RollbackTx() error {
return nil
}

// EXECUTE TRANSACTION FAIL
func (*mockServiceQueryExecutorExecuteTransFail) BeginTx() error {
return nil
}

func (*mockServiceQueryExecutorExecuteTransFail) ExecuteTransactions(queries [][]interface{}) error {
return blocker.NewBlocker(
blocker.AuthErr,
"failed to execute Transactions",
)
}

func (*mockServiceQueryExecutorExecuteTransFail) CommitTx() error {
return nil
}

func (*mockServiceQueryExecutorExecuteTransFail) RollbackTx() error {
return nil
}

// COMMITX FAIL
func (*mockServiceQueryExecutorCommitTXFail) BeginTx() error {
return nil
Expand Down
2 changes: 1 addition & 1 deletion core/service/mempoolCoreService.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (mps *MempoolService) GetMempoolTransaction(id int64) (*model.MempoolTransa
return nil, blocker.NewBlocker(blocker.DBRowNotFound, "MempoolTransactionNotFound")
}

// AddMempoolTransaction validates and insert a transaction into the mempool
// AddMempoolTransaction validates and insert a transaction into the mempool and also set the BlockHeight as well
func (mps *MempoolService) AddMempoolTransaction(mpTx *model.MempoolTransaction) error {

// check maximum mempool
Expand Down