Skip to content

Add node registry partial history to snapshots #734

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions common/constant/receipt.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,4 @@ const (
ReceiptBatchPickMultiplier = uint32(5)
ReceiptHashSize = 32 // sha256
ReceiptGenerateMarkleRootPeriod = 20 * time.Second
// number of blocks to lookup for receipts before current height (
// this is also the number of blocks to consider when selecting receipts to be included in a snapshot)
LinkedReceiptBlocksLimit = uint32(720)
)
19 changes: 16 additions & 3 deletions common/query/nodeRegistrationQuery.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,23 @@ func (nrq *NodeRegistrationQuery) Scan(nr *model.NodeRegistration, row *sql.Row)
return nil
}

// SelectDataForSnapshot this query selects only node registry latest state from height 0 to 'fromHeight' (
// excluded) and all records from 'fromHeight' to 'toHeight',
// removing from first selection records that have duplicate ids with second second selection.
// This way we make sure only one version of every id has 'latest' field set to true
func (nrq *NodeRegistrationQuery) SelectDataForSnapshot(fromHeight, toHeight uint32) string {
return fmt.Sprintf("SELECT %s FROM %s WHERE (id, height) IN (SELECT t2.id, MAX("+
"t2.height) FROM %s as t2 WHERE height >= %d AND height <= %d GROUP BY t2.id) ORDER BY height",
strings.Join(nrq.Fields, ","), nrq.getTableName(), nrq.getTableName(), fromHeight, toHeight)
if fromHeight > 0 {
return fmt.Sprintf("SELECT %s FROM %s WHERE (id, height) IN (SELECT t2.id, "+
"MAX(t2.height) FROM %s as t2 WHERE t2.height >= 0 AND t2.height < %d GROUP BY t2.id) "+
"AND id NOT IN (SELECT DISTINCT t3.id FROM %s as t3 WHERE t3.height >= %d AND t3.height < %d) "+
"UNION ALL SELECT %s FROM %s WHERE height >= %d AND height <= %d "+
"ORDER BY height, id",
strings.Join(nrq.Fields, ","), nrq.getTableName(), nrq.getTableName(), fromHeight,
nrq.getTableName(), fromHeight, toHeight,
strings.Join(nrq.Fields, ","), nrq.getTableName(), fromHeight, toHeight)
}
return fmt.Sprintf("SELECT %s FROM %s WHERE height >= %d AND height <= %d ORDER BY height, id",
strings.Join(nrq.Fields, ","), nrq.getTableName(), fromHeight, toHeight)
}

// TrimDataBeforeSnapshot delete entries to assure there are no duplicates before applying a snapshot
Expand Down
71 changes: 59 additions & 12 deletions common/query/nodeRegistrationQuery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,18 +437,6 @@ func TestNodeRegistrationQuery_InsertNodeRegistration(t *testing.T) {
})
}

func TestNodeRegistrationQuery_SelectDataForSnapshot(t *testing.T) {
t.Run("GetActiveNodeRegistrations", func(t *testing.T) {
res := mockNodeRegistrationQuery.SelectDataForSnapshot(0, 10)
want := "SELECT id,node_public_key,account_address,registration_height,node_address,locked_balance,registration_status,latest," +
"height FROM node_registry WHERE (id, height) IN (SELECT t2.id, " +
"MAX(t2.height) FROM node_registry as t2 WHERE height >= 0 AND height <= 10 GROUP BY t2.id) ORDER BY height"
if res != want {
t.Errorf("string not match:\nget: %s\nwant: %s", res, want)
}
})
}

func TestNodeRegistrationQuery_TrimDataBeforeSnapshot(t *testing.T) {
t.Run("TrimDataBeforeSnapshot:success", func(t *testing.T) {
res := mockNodeRegistrationQuery.TrimDataBeforeSnapshot(0, 10)
Expand All @@ -458,3 +446,62 @@ func TestNodeRegistrationQuery_TrimDataBeforeSnapshot(t *testing.T) {
}
})
}

func TestNodeRegistrationQuery_SelectDataForSnapshot(t *testing.T) {
type fields struct {
Fields []string
TableName string
}
type args struct {
fromHeight uint32
toHeight uint32
}
tests := []struct {
name string
fields fields
args args
want string
}{
{
name: "SelectDataForSnapshot:success-{fromGenesis}",
fields: fields{
TableName: NewNodeRegistrationQuery().TableName,
Fields: NewNodeRegistrationQuery().Fields,
},
args: args{
fromHeight: 0,
toHeight: 10,
},
want: "SELECT id,node_public_key,account_address,registration_height,node_address,locked_balance,registration_status,latest," +
"height FROM node_registry WHERE height >= 0 AND height <= 10 ORDER BY height, id",
},
{
name: "SelectDataForSnapshot:success-{fromArbitraryHeight}",
fields: fields{
TableName: NewNodeRegistrationQuery().TableName,
Fields: NewNodeRegistrationQuery().Fields,
},
args: args{
fromHeight: 720,
toHeight: 1440,
},
want: "SELECT id,node_public_key,account_address,registration_height,node_address,locked_balance,registration_status,latest," +
"height FROM node_registry WHERE (id, height) IN (SELECT t2.id, MAX(t2.height) FROM node_registry as t2 WHERE t2." +
"height >= 0 AND t2.height < 720 GROUP BY t2.id) AND id NOT IN (SELECT DISTINCT t3.id FROM node_registry as t3 WHERE t3." +
"height >= 720 AND t3.height < 1440) UNION ALL SELECT id,node_public_key,account_address,registration_height,node_address," +
"locked_balance,registration_status,latest," +
"height FROM node_registry WHERE height >= 720 AND height <= 1440 ORDER BY height, id",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
nrq := &NodeRegistrationQuery{
Fields: tt.fields.Fields,
TableName: tt.fields.TableName,
}
if got := nrq.SelectDataForSnapshot(tt.args.fromHeight, tt.args.toHeight); got != tt.want {
t.Errorf("NodeRegistrationQuery.SelectDataForSnapshot() = %v, want %v", got, tt.want)
}
})
}
}
18 changes: 17 additions & 1 deletion common/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func GetDerivedQuery(ct chaintype.ChainType) (derivedQuery []DerivedQuery) {
return derivedQuery
}

// GetSnapshotQuery func to get all query repos that have a SelectDataForSnapshot method
// GetSnapshotQuery func to get all query repos that have a SelectDataForSnapshot method (that have data to be included in snapshots)
func GetSnapshotQuery(ct chaintype.ChainType) (snapshotQuery map[string]SnapshotQuery) {
switch ct.(type) {
case *chaintype.MainChain:
Expand All @@ -69,3 +69,19 @@ func GetSnapshotQuery(ct chaintype.ChainType) (snapshotQuery map[string]Snapshot
}
return snapshotQuery
}

// GetBlocksmithSafeQuery func to get all query repos that must save their full history in snapshots,
// for a minRollbackHeight number of blocks, to not break blocksmith process logic
func GetBlocksmithSafeQuery(ct chaintype.ChainType) (snapshotQuery map[string]bool) {
switch ct.(type) {
case *chaintype.MainChain:
snapshotQuery = map[string]bool{
"block": true,
"nodeRegistration": true,
"publishedReceipt": true,
}
default:
snapshotQuery = map[string]bool{}
}
return snapshotQuery
}
16 changes: 6 additions & 10 deletions core/blockchainsync/blockchainOrchestratorService.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package blockchainsync

import (
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -31,8 +30,7 @@ type (
// of the blockchains so that the expected behavior is consistent within the application.
// In the future, this service may also be expanded to orchestrate the smithing activity of the blockchains
func NewBlockchainOrchestratorService(
spinechainSyncService BlockchainSyncServiceInterface,
mainchainSyncService BlockchainSyncServiceInterface,
spinechainSyncService, mainchainSyncService BlockchainSyncServiceInterface,
blockchainStatusService service.BlockchainStatusServiceInterface,
spineBlockManifestService service.SpineBlockManifestServiceInterface,
fileDownloader p2p.FileDownloaderInterface,
Expand Down Expand Up @@ -110,12 +108,10 @@ func (bos *BlockchainOrchestratorService) DownloadSnapshot(ct chaintype.ChainTyp
if err != nil {
bos.Logger.Warning(err)
return err
} else {
if err := bos.MainchainSnapshotBlockServices.ImportSnapshotFile(snapshotFileInfo); err != nil {
bos.Logger.Warningf("error importing snapshot file for chaintype %s at height %d: %s\n", ct.GetName(),
lastSpineBlockManifest.SpineBlockManifestHeight, err.Error())
return err
}
} else if err := bos.MainchainSnapshotBlockServices.ImportSnapshotFile(snapshotFileInfo); err != nil {
bos.Logger.Warningf("error importing snapshot file for chaintype %s at height %d: %s\n", ct.GetName(),
lastSpineBlockManifest.SpineBlockManifestHeight, err.Error())
return err
}

}
Expand All @@ -133,7 +129,7 @@ func (bos *BlockchainOrchestratorService) Start() error {

lastMainBlock, err := bos.MainchainSyncService.GetBlockService().GetLastBlock()
if err != nil {
return errors.New(fmt.Sprintf("cannot get last main block: %s", err.Error()))
return fmt.Errorf("cannot get last main block: %s", err.Error())
}
if lastMainBlock.Height == 0 &&
bos.MainchainSyncService.GetBlockService().GetChainType().HasSnapshots() {
Expand Down
17 changes: 11 additions & 6 deletions core/blockchainsync/blockchainOrchestratorService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,18 @@ type (
}
)

func (*MockSpineBlockManifestServiceError) GetLastSpineBlockManifest(ct chaintype.ChainType, mbType model.SpineBlockManifestType) (*model.SpineBlockManifest, error) {
func (*MockSpineBlockManifestServiceError) GetLastSpineBlockManifest(ct chaintype.ChainType,
mbType model.SpineBlockManifestType) (*model.SpineBlockManifest, error) {
return nil, errors.New("GetLastSpineBlockManifest error")
}

func (*MockSpineBlockManifestServiceSuccessNoSpineBlockManifest) GetLastSpineBlockManifest(ct chaintype.ChainType, mbType model.SpineBlockManifestType) (*model.SpineBlockManifest, error) {
func (*MockSpineBlockManifestServiceSuccessNoSpineBlockManifest) GetLastSpineBlockManifest(ct chaintype.ChainType,
mbType model.SpineBlockManifestType) (*model.SpineBlockManifest, error) {
return nil, nil
}

func (*MockSpineBlockManifestServiceSuccessWithSpineBlockManifest) GetLastSpineBlockManifest(ct chaintype.ChainType, mbType model.SpineBlockManifestType) (*model.SpineBlockManifest, error) {
func (*MockSpineBlockManifestServiceSuccessWithSpineBlockManifest) GetLastSpineBlockManifest(ct chaintype.ChainType,
mbType model.SpineBlockManifestType) (*model.SpineBlockManifest, error) {
return &model.SpineBlockManifest{}, nil
}

Expand Down Expand Up @@ -116,11 +119,13 @@ func (*MockBlockServiceSuccess) GetLastBlock() (*model.Block, error) {
return &model.Block{}, nil
}

func (*MockFileDownloaderError) DownloadSnapshot(ct chaintype.ChainType, spineBlockManifest *model.SpineBlockManifest) (*model.SnapshotFileInfo, error) {
func (*MockFileDownloaderError) DownloadSnapshot(ct chaintype.ChainType, spineBlockManifest *model.SpineBlockManifest) (*model.
SnapshotFileInfo, error) {
return nil, errors.New("DownloadSnapshot error")
}

func (*MockFileDownloaderSuccess) DownloadSnapshot(ct chaintype.ChainType, spineBlockManifest *model.SpineBlockManifest) (*model.SnapshotFileInfo, error) {
func (*MockFileDownloaderSuccess) DownloadSnapshot(ct chaintype.ChainType, spineBlockManifest *model.SpineBlockManifest) (*model.
SnapshotFileInfo, error) {
return &model.SnapshotFileInfo{}, nil
}

Expand Down Expand Up @@ -248,7 +253,7 @@ func TestBlockchainOrchestratorService_DownloadSnapshot(t *testing.T) {
MainchainSnapshotBlockServices: tt.fields.MainchainSnapshotBlockServices,
Logger: tt.fields.Logger,
}
bos.DownloadSnapshot(tt.args.ct)
_ = bos.DownloadSnapshot(tt.args.ct)
if err := bos.DownloadSnapshot(tt.args.ct); (err != nil) != tt.wantErr {
t.Errorf("BlockchainOrchestratorService.DownloadSnapshot() error = %v, wantErr %v", err, tt.wantErr)
}
Expand Down
21 changes: 15 additions & 6 deletions core/service/blockMainService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,8 @@ func (*mockBlocksmithServicePushBlock) GetSortedBlocksmithsMap(*model.Block) map
func (*mockBlocksmithServicePushBlock) SortBlocksmiths(block *model.Block, withLock bool) {
}

func (*mockBlocksmithServicePushBlock) IsBlockTimestampValid(blocksmithIndex, numberOfBlocksmiths int64, previousBlock, currentBlock *model.Block) error {
func (*mockBlocksmithServicePushBlock) IsBlockTimestampValid(blocksmithIndex, numberOfBlocksmiths int64, previousBlock,
currentBlock *model.Block) error {
return nil
}

Expand Down Expand Up @@ -2702,7 +2703,8 @@ func (*mockBlocksmithServiceReceiveBlock) GetSortedBlocksmithsMap(block *model.B
}
}

func (*mockBlocksmithServiceReceiveBlock) IsBlockTimestampValid(blocksmithIndex, numberOfBlocksmiths int64, previousBlock, currentBlock *model.Block) error {
func (*mockBlocksmithServiceReceiveBlock) IsBlockTimestampValid(blocksmithIndex, numberOfBlocksmiths int64, previousBlock,
currentBlock *model.Block) error {
return nil
}

Expand All @@ -2729,7 +2731,11 @@ func (*mockQueryExecutorReceiveBlockFail) ExecuteSelectRow(qStr string, tx bool,
return db.QueryRow(qStr), nil
}

func (bss *mockBlocksmithServiceReceiveBlock) IsValidSmithTime(blocksmithIndex int64, numberOfBlocksmiths int64, previousBlock *model.Block) error {
func (bss *mockBlocksmithServiceReceiveBlock) IsValidSmithTime(
blocksmithIndex,
numberOfBlocksmiths int64,
previousBlock *model.Block,
) error {
return nil
}

Expand Down Expand Up @@ -3620,11 +3626,13 @@ func (*mockBlocksmithServiceValidateBlockSuccess) GetSortedBlocksmithsMap(*model
}
}

func (*mockBlocksmithServiceValidateBlockSuccess) IsBlockTimestampValid(blocksmithIndex, numberOfBlocksmiths int64, previousBlock, currentBlock *model.Block) error {
func (*mockBlocksmithServiceValidateBlockSuccess) IsBlockTimestampValid(blocksmithIndex, numberOfBlocksmiths int64, previousBlock,
currentBlock *model.Block) error {
return nil
}

func (*mockBlocksmithServiceValidateBlockSuccess) IsValidSmithTime(blocksmithIndex int64, numberOfBlocksmiths int64, previousBlock *model.Block) error {
func (*mockBlocksmithServiceValidateBlockSuccess) IsValidSmithTime(blocksmithIndex, numberOfBlocksmiths int64,
previousBlock *model.Block) error {
return nil
}

Expand Down Expand Up @@ -4490,7 +4498,8 @@ func (*mockBlocksmithServiceProcessQueued) GetSortedBlocksmithsMap(block *model.
func (*mockBlocksmithServiceProcessQueued) SortBlocksmiths(block *model.Block, withLock bool) {
}

func (*mockBlocksmithServiceProcessQueued) IsBlockTimestampValid(blocksmithIndex, numberOfBlocksmiths int64, previousBlock, currentBlock *model.Block) error {
func (*mockBlocksmithServiceProcessQueued) IsBlockTimestampValid(blocksmithIndex, numberOfBlocksmiths int64, previousBlock,
currentBlock *model.Block) error {
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions core/service/blockSpineService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,8 @@ func (*mockSpineBlocksmithServicePushBlock) GetSortedBlocksmithsMap(*model.Block
}
func (*mockSpineBlocksmithServicePushBlock) SortBlocksmiths(block *model.Block, withLock bool) {
}
func (*mockSpineBlocksmithServicePushBlock) IsBlockTimestampValid(blocksmithIndex, numberOfBlocksmiths int64, previousBlock, currentBlock *model.Block) error {
func (*mockSpineBlocksmithServicePushBlock) IsBlockTimestampValid(blocksmithIndex, numberOfBlocksmiths int64, previousBlock,
currentBlock *model.Block) error {
return nil
}
func TestBlockSpineService_PushBlock(t *testing.T) {
Expand Down Expand Up @@ -2843,7 +2844,8 @@ func (*mockSpineBlocksmithServiceValidateBlockSuccess) GetSortedBlocksmithsMap(*
string(mockSpineBlockData.BlocksmithPublicKey): &secondIndex,
}
}
func (*mockSpineBlocksmithServiceValidateBlockSuccess) IsBlockTimestampValid(blocksmithIndex, numberOfBlocksmiths int64, previousBlock, currentBlock *model.Block) error {
func (*mockSpineBlocksmithServiceValidateBlockSuccess) IsBlockTimestampValid(blocksmithIndex, numberOfBlocksmiths int64, previousBlock,
currentBlock *model.Block) error {
return nil
}

Expand Down
17 changes: 8 additions & 9 deletions core/service/snapshotMainBlockService.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type (
SkippedBlocksmithQuery query.SkippedBlocksmithQueryInterface
BlockQuery query.BlockQueryInterface
SnapshotQueries map[string]query.SnapshotQuery
BlocksmithSafeQuery map[string]bool
DerivedQueries []query.DerivedQuery
}
)
Expand All @@ -52,6 +53,7 @@ func NewSnapshotMainBlockService(
skippedBlocksmithQuery query.SkippedBlocksmithQueryInterface,
blockQuery query.BlockQueryInterface,
snapshotQueries map[string]query.SnapshotQuery,
blocksmithSafeQueries map[string]bool,
derivedQueries []query.DerivedQuery,
) *SnapshotMainBlockService {
return &SnapshotMainBlockService{
Expand All @@ -72,6 +74,7 @@ func NewSnapshotMainBlockService(
SkippedBlocksmithQuery: skippedBlocksmithQuery,
BlockQuery: blockQuery,
SnapshotQueries: snapshotQueries,
BlocksmithSafeQuery: blocksmithSafeQueries,
DerivedQueries: derivedQueries,
}
}
Expand Down Expand Up @@ -99,15 +102,11 @@ func (ss *SnapshotMainBlockService) NewSnapshotFile(block *model.Block) (snapsho
fromHeight uint32
rows *sql.Rows
)
if qryRepoName == "block" {
if snapshotPayloadHeight > constant.MinRollbackBlocks {
fromHeight = snapshotPayloadHeight - constant.MinRollbackBlocks
}
}
if qryRepoName == "publishedReceipt" {
if snapshotPayloadHeight > constant.LinkedReceiptBlocksLimit {
fromHeight = snapshotPayloadHeight - constant.LinkedReceiptBlocksLimit
}
// if current query repo is blocksmith safe,
// include more blocks to make sure we don't break smithing process due to missing data such as blocks,
// published receipts and node registrations
if ss.BlocksmithSafeQuery[qryRepoName] && snapshotPayloadHeight > constant.MinRollbackBlocks {
fromHeight = snapshotPayloadHeight - constant.MinRollbackBlocks
}
qry := snapshotQuery.SelectDataForSnapshot(fromHeight, snapshotPayloadHeight)
rows, err = ss.QueryExecutor.ExecuteSelect(qry, false)
Expand Down
Loading