Skip to content

Re-ApplyUnconfirmed pending transactions #853

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions common/transaction/fixtureGenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,3 +350,23 @@ func GetFixtureForSpecificTransaction(
tx.TransactionBody = nil
return tx, transactionBytes
}

func GetFixturesForBlock(height uint32, id int64) *model.Block {
return &model.Block{
ID: id,
BlockHash: []byte{},
PreviousBlockHash: []byte{},
Height: height,
Timestamp: 10000,
BlockSeed: []byte{},
BlockSignature: []byte{3},
CumulativeDifficulty: "1",
PayloadLength: 1,
PayloadHash: []byte{},
BlocksmithPublicKey: []byte{},
TotalAmount: 1000,
TotalFee: 0,
TotalCoinBase: 1,
Version: 0,
}
}
2 changes: 1 addition & 1 deletion core/blockchainsync/blockchainOrchestratorService.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ CheckLoop:
}

func (bos *BlockchainOrchestratorService) DownloadSnapshot(ct chaintype.ChainType) error {
bos.Logger.Info("dowloading snapshots...")
bos.Logger.Info("downloading snapshots...")
lastSpineBlockManifest, err := bos.SpineBlockManifestService.GetLastSpineBlockManifest(ct,
model.SpineBlockManifestType_Snapshot)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions core/service/blockSpineService.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func (bs *BlockSpineService) GetBlockHash(block *model.Block) ([]byte, error) {

}

// GetLastBlock return the last pushed block
// GetBlockByHeight return the last pushed block
func (bs *BlockSpineService) GetBlockByHeight(height uint32) (*model.Block, error) {
block, err := commonUtils.GetBlockByHeight(height, bs.QueryExecutor, bs.BlockQuery)
if err != nil {
Expand All @@ -454,7 +454,7 @@ func (bs *BlockSpineService) GetBlockByHeight(height uint32) (*model.Block, erro
return block, nil
}

// GetGenesis return the genesis block
// GetGenesisBlock return the genesis block
func (bs *BlockSpineService) GetGenesisBlock() (*model.Block, error) {
var (
genesisBlock model.Block
Expand Down Expand Up @@ -504,7 +504,7 @@ func (bs *BlockSpineService) PopulateBlockData(block *model.Block) error {
return nil
}

// GetPayloadBytes compute and return the block's payload hash
// GetPayloadHashAndLength compute and return the block's payload hash
func (bs *BlockSpineService) GetPayloadHashAndLength(block *model.Block) (payloadHash []byte, payloadLength uint32, err error) {
var (
digest = sha3.New256()
Expand Down
60 changes: 54 additions & 6 deletions core/service/snapshotMainBlockService.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ import (
"github.com/zoobc/zoobc-core/common/model"
"github.com/zoobc/zoobc-core/common/monitoring"
"github.com/zoobc/zoobc-core/common/query"
"github.com/zoobc/zoobc-core/common/transaction"
commonUtil "github.com/zoobc/zoobc-core/common/util"
)

type (
SnapshotMainBlockService struct {
SnapshotPath string
chainType chaintype.ChainType
TransactionUtil transaction.UtilInterface
TypeActionSwitcher transaction.TypeActionSwitcher
Logger *log.Logger
SnapshotBasicChunkStrategy SnapshotChunkStrategyInterface
QueryExecutor query.ExecutorInterface
Expand Down Expand Up @@ -56,6 +60,8 @@ func NewSnapshotMainBlockService(
snapshotQueries map[string]query.SnapshotQuery,
blocksmithSafeQueries map[string]bool,
derivedQueries []query.DerivedQuery,
transactionUtil transaction.UtilInterface,
typeSwitcher transaction.TypeActionSwitcher,
) *SnapshotMainBlockService {
return &SnapshotMainBlockService{
SnapshotPath: snapshotPath,
Expand All @@ -77,6 +83,8 @@ func NewSnapshotMainBlockService(
SnapshotQueries: snapshotQueries,
BlocksmithSafeQuery: blocksmithSafeQueries,
DerivedQueries: derivedQueries,
TransactionUtil: transactionUtil,
TypeActionSwitcher: typeSwitcher,
}
}

Expand Down Expand Up @@ -167,8 +175,17 @@ func (ss *SnapshotMainBlockService) NewSnapshotFile(block *model.Block) (snapsho

// ImportSnapshotFile parses a downloaded snapshot file into db
func (ss *SnapshotMainBlockService) ImportSnapshotFile(snapshotFileInfo *model.SnapshotFileInfo) error {
snapshotPayload, err := ss.SnapshotBasicChunkStrategy.BuildSnapshotFromChunks(snapshotFileInfo.GetSnapshotFileHash(),
snapshotFileInfo.GetFileChunksHashes(), ss.SnapshotPath)
var (
snapshotPayload *model.SnapshotPayload
currentBlock *model.Block
err error
)

snapshotPayload, err = ss.SnapshotBasicChunkStrategy.BuildSnapshotFromChunks(
snapshotFileInfo.GetSnapshotFileHash(),
snapshotFileInfo.GetFileChunksHashes(),
ss.SnapshotPath,
)
if err != nil {
return err
}
Expand All @@ -177,6 +194,38 @@ func (ss *SnapshotMainBlockService) ImportSnapshotFile(snapshotFileInfo *model.S
return err
}

ss.Logger.Infof("Need Re-ApplyUnconfirmed in %d pending transactions", len(snapshotPayload.GetPendingTransactions()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥇

/*
Need to manually ApplyUnconfirmed the pending transaction
after finished insert snapshot payload into DB
*/
currentBlock, err = commonUtil.GetLastBlock(ss.QueryExecutor, ss.BlockQuery)
if err != nil {
return err
}
for _, pendingTX := range snapshotPayload.GetPendingTransactions() {
var (
innerTX *model.Transaction
txType transaction.TypeAction
)
if pendingTX.GetStatus() == model.PendingTransactionStatus_PendingTransactionPending {

innerTX, err = ss.TransactionUtil.ParseTransactionBytes(pendingTX.GetTransactionBytes(), false)
if err != nil {
return err
}

innerTX.Height = currentBlock.GetHeight()
txType, err = ss.TypeActionSwitcher.GetTransactionType(innerTX)
if err != nil {
return err
}
err = txType.ApplyUnconfirmed()
if err != nil {
return err
}
}
}
return nil
}

Expand Down Expand Up @@ -305,14 +354,13 @@ func (ss *SnapshotMainBlockService) InsertSnapshotPayloadToDB(payload *model.Sna
}

for key, dQuery := range ss.DerivedQueries {
queries := dQuery.Rollback(height)
queries = dQuery.Rollback(height)
err = ss.QueryExecutor.ExecuteTransactions(queries)
if err != nil {
fmt.Println(key)
fmt.Println("Failed execute rollback queries, ", err.Error())
ss.Logger.Errorf("Failed execute rollback queries in %d: %s", key, err.Error())
err = ss.QueryExecutor.RollbackTx()
if err != nil {
fmt.Println("Failed to run RollbackTX DB")
ss.Logger.Warnf("Failed to run RollbackTX DB: %s", err.Error())
}
break
}
Expand Down
36 changes: 36 additions & 0 deletions core/service/snapshotMainBlockService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"path/filepath"
"reflect"
"regexp"
"testing"
"time"

Expand All @@ -16,6 +17,7 @@ import (
"github.com/zoobc/zoobc-core/common/constant"
"github.com/zoobc/zoobc-core/common/model"
"github.com/zoobc/zoobc-core/common/query"
"github.com/zoobc/zoobc-core/common/transaction"
)

type (
Expand Down Expand Up @@ -408,6 +410,32 @@ func (*mockSnapshotQueryExecutor) ExecuteSelect(query string, tx bool, args ...i
return db.Query("")
}

func (*mockSnapshotQueryExecutor) ExecuteSelectRow(qe string, _ bool, _ ...interface{}) (*sql.Row, error) {
db, mock, _ := sqlmock.New()
defer db.Close()

mockedBlock := transaction.GetFixturesForBlock(100, 123456789)
mockedRows := mock.NewRows(query.NewBlockQuery(chaintype.GetChainType(0)).Fields)
mockedRows.AddRow(
mockedBlock.GetID(),
mockedBlock.GetBlockHash(),
mockedBlock.GetPreviousBlockHash(),
mockedBlock.GetHeight(),
mockedBlock.GetTimestamp(),
mockedBlock.GetBlockSeed(),
mockedBlock.GetBlockSignature(),
mockedBlock.GetCumulativeDifficulty(),
mockedBlock.GetPayloadLength(),
mockedBlock.GetPayloadHash(),
mockedBlock.GetBlocksmithPublicKey(),
mockedBlock.GetTotalAmount(),
mockedBlock.GetTotalFee(),
mockedBlock.GetTotalCoinBase(),
mockedBlock.GetVersion(),
)
mock.ExpectQuery(regexp.QuoteMeta(qe)).WillReturnRows(mockedRows)
return db.QueryRow(qe), nil
}
func (mocksbcs *mockSnapshotBasicChunkStrategy) GenerateSnapshotChunks(snapshotPayload *model.SnapshotPayload,
filePath string) (fullHash []byte,
fileChunkHashes [][]byte, err error) {
Expand Down Expand Up @@ -755,6 +783,8 @@ func TestSnapshotMainBlockService_ImportSnapshotFile(t *testing.T) {
SnapshotQueries map[string]query.SnapshotQuery
BlocksmithSafeQuery map[string]bool
DerivedQueries []query.DerivedQuery
TransactionUtil transaction.UtilInterface
TypeActionSwitcher transaction.TypeActionSwitcher
}
tests := []struct {
name string
Expand Down Expand Up @@ -788,6 +818,10 @@ func TestSnapshotMainBlockService_ImportSnapshotFile(t *testing.T) {
SnapshotQueries: query.GetSnapshotQuery(chaintype.GetChainType(0)),
BlocksmithSafeQuery: query.GetBlocksmithSafeQuery(chaintype.GetChainType(0)),
DerivedQueries: query.GetDerivedQuery(chaintype.GetChainType(0)),
TransactionUtil: &transaction.Util{},
TypeActionSwitcher: &transaction.TypeSwitcher{
Executor: &mockSnapshotQueryExecutor{success: true},
},
},
},
}
Expand All @@ -813,6 +847,8 @@ func TestSnapshotMainBlockService_ImportSnapshotFile(t *testing.T) {
SnapshotQueries: tt.fields.SnapshotQueries,
BlocksmithSafeQuery: tt.fields.BlocksmithSafeQuery,
DerivedQueries: tt.fields.DerivedQueries,
TransactionUtil: tt.fields.TransactionUtil,
TypeActionSwitcher: tt.fields.TypeActionSwitcher,
}
snapshotFileInfo, err := ss.NewSnapshotFile(blockForSnapshot1)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/service/spineBlockManifestService.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (ss *SpineBlockManifestService) InsertSpineBlockManifest(spineBlockManifest
return nil
}

// GetBodyBytes translate tx body to bytes representation
// GetSpineBlockManifestBytes translate tx body to bytes representation
func (ss *SpineBlockManifestService) GetSpineBlockManifestBytes(spineBlockManifest *model.SpineBlockManifest) []byte {
buffer := bytes.NewBuffer([]byte{})
buffer.Write(util.ConvertUint64ToBytes(uint64(spineBlockManifest.ID)))
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ func init() {
query.GetSnapshotQuery(mainchain),
query.GetBlocksmithSafeQuery(mainchain),
query.GetDerivedQuery(mainchain),
transactionUtil,
&transaction.TypeSwitcher{Executor: queryExecutor},
)

snapshotService = service.NewSnapshotService(
Expand Down