diff --git a/cmd/block/blockGenerator.go b/cmd/block/blockGenerator.go index c0da45a03..3524c67b5 100644 --- a/cmd/block/blockGenerator.go +++ b/cmd/block/blockGenerator.go @@ -21,6 +21,12 @@ import ( "github.com/zoobc/zoobc-core/observer" ) +type ( + mockBlockTypeStatusService struct { + service.BlockTypeStatusService + } +) + var ( blocksmith *model.Blocksmith chainType chaintype.ChainType @@ -52,6 +58,14 @@ var ( } ) +func (*mockBlockTypeStatusService) IsFirstDownloadFinished(ct chaintype.ChainType) bool { + return true +} + +func (*mockBlockTypeStatusService) IsDownloading(ct chaintype.ChainType) bool { + return true +} + func init() { fakeBlockCmd.Flags().IntVar( &numberOfBlocks, @@ -194,6 +208,7 @@ func generateBlocks(numberOfBlocks int, blocksmithSecretPhrase, outputPath strin blocksmith, blockService, log.New(), + &mockBlockTypeStatusService{}, ) startTime := time.Now().UnixNano() / 1e6 fmt.Printf("generating %d blocks\n", numberOfBlocks) diff --git a/common/blocker/blocker.go b/common/blocker/blocker.go index 2feb05cc5..04665cf41 100644 --- a/common/blocker/blocker.go +++ b/common/blocker/blocker.go @@ -30,6 +30,7 @@ var ( ServerError TypeBlocker = "ServerError" SmithingErr TypeBlocker = "SmithingErr" ChainValidationErr TypeBlocker = "ChainValidationErr" + P2PNetworkConnectionErr TypeBlocker = "P2PNetworkConnectionErr" ) func NewBlocker(typeBlocker TypeBlocker, message string) error { diff --git a/common/constant/p2p.go b/common/constant/p2p.go index fb183ec56..4c2f50522 100644 --- a/common/constant/p2p.go +++ b/common/constant/p2p.go @@ -28,9 +28,9 @@ const ( // PriorityStrategyMaxStayedInUnresolvedPeers max time a peer can stay before being cycled out from unresolved peers PriorityStrategyMaxStayedInUnresolvedPeers int64 = 120 // BlockchainsyncWaitingTime time, in seconds, to wait before start syncing the blockchain - BlockchainsyncWaitingTime time.Duration = 3 * time.Second - // BlockchainsyncSpineCheckInterval time, in seconds, between checks if spine blocks have finished to be downloaded - BlockchainsyncSpineCheckInterval time.Duration = 3 * time.Second + BlockchainsyncWaitingTime time.Duration = 5 * time.Second + // BlockchainsyncCheckInterval time, in seconds, between checks if spine blocks have finished to be downloaded + BlockchainsyncCheckInterval time.Duration = 10 * time.Second // BlockchainsyncSpineTimeout timeout, in seconds, for spine blocks to be downloaded from the network // FIXME: this is for debugging purposes only and must higher on production, // where downloading the spine blocks could take longer than 30 minutes diff --git a/core/blockchainsync/blockchainSync.go b/core/blockchainsync/blockchainSync.go index ae9af8c34..c1b0971ca 100644 --- a/core/blockchainsync/blockchainSync.go +++ b/core/blockchainsync/blockchainSync.go @@ -2,14 +2,13 @@ package blockchainsync import ( "fmt" + "github.com/zoobc/zoobc-core/common/blocker" "time" log "github.com/sirupsen/logrus" "github.com/zoobc/zoobc-core/common/chaintype" "github.com/zoobc/zoobc-core/common/constant" - "github.com/zoobc/zoobc-core/common/kvdb" "github.com/zoobc/zoobc-core/common/model" - "github.com/zoobc/zoobc-core/common/query" "github.com/zoobc/zoobc-core/common/transaction" "github.com/zoobc/zoobc-core/core/service" "github.com/zoobc/zoobc-core/p2p/client" @@ -19,53 +18,35 @@ import ( // TODO: rename into something more specific, such as SyncService type Service struct { // isScanningBlockchain bool - ChainType chaintype.ChainType - PeerServiceClient client.PeerServiceClientInterface - PeerExplorer strategy.PeerExplorerStrategyInterface - BlockService service.BlockServiceInterface - BlockchainDownloader BlockchainDownloadInterface - ForkingProcessor ForkingProcessorInterface - Logger *log.Logger - TransactionUtil transaction.UtilInterface + ChainType chaintype.ChainType + PeerServiceClient client.PeerServiceClientInterface + PeerExplorer strategy.PeerExplorerStrategyInterface + BlockService service.BlockServiceInterface + BlockchainDownloader BlockchainDownloadInterface + ForkingProcessor ForkingProcessorInterface + Logger *log.Logger + TransactionUtil transaction.UtilInterface + BlockTypeStatusService service.BlockTypeStatusServiceInterface } func NewBlockchainSyncService( blockService service.BlockServiceInterface, peerServiceClient client.PeerServiceClientInterface, peerExplorer strategy.PeerExplorerStrategyInterface, - queryExecutor query.ExecutorInterface, - mempoolService service.MempoolServiceInterface, - txActionSwitcher transaction.TypeActionSwitcher, logger *log.Logger, - kvdb kvdb.KVExecutorInterface, - transactionUtil transaction.UtilInterface, - transactionCoreService service.TransactionCoreServiceInterface, + blockTypeStatusService service.BlockTypeStatusServiceInterface, + blockchainDownloader BlockchainDownloadInterface, + forkingProcessor ForkingProcessorInterface, ) *Service { return &Service{ - ChainType: blockService.GetChainType(), - BlockService: blockService, - PeerServiceClient: peerServiceClient, - PeerExplorer: peerExplorer, - BlockchainDownloader: &BlockchainDownloader{ - ChainType: blockService.GetChainType(), - BlockService: blockService, - PeerServiceClient: peerServiceClient, - PeerExplorer: peerExplorer, - Logger: logger, - }, - ForkingProcessor: &ForkingProcessor{ - ChainType: blockService.GetChainType(), - BlockService: blockService, - QueryExecutor: queryExecutor, - ActionTypeSwitcher: txActionSwitcher, - MempoolService: mempoolService, - KVExecutor: kvdb, - PeerExplorer: peerExplorer, - Logger: logger, - TransactionUtil: transactionUtil, - TransactionCorService: transactionCoreService, - }, - Logger: logger, + ChainType: blockService.GetChainType(), + BlockService: blockService, + PeerServiceClient: peerServiceClient, + PeerExplorer: peerExplorer, + BlockchainDownloader: blockchainDownloader, + ForkingProcessor: forkingProcessor, + Logger: logger, + BlockTypeStatusService: blockTypeStatusService, } } @@ -120,8 +101,23 @@ func (bss *Service) getMoreBlocks() { needDownloadBlock := true peerBlockchainInfo, err = bss.BlockchainDownloader.GetPeerBlockchainInfo() if err != nil { - bss.Logger.Infof("\nfailed to getPeerBlockchainInfo: %v\n\n", err) needDownloadBlock = false + errCasted := err.(blocker.Blocker) + switch errCasted.Type { + case blocker.P2PNetworkConnectionErr: + // this will allow the node to start smithing if it fails to connect to the p2p network, + // eg. he is the first node. if later on he can connect, it will try resolve the fork normally + bss.BlockTypeStatusService.SetFirstDownloadFinished(bss.ChainType, true) + bss.Logger.Info(errCasted.Message) + case blocker.ChainValidationErr: + bss.Logger.Infof("peer %s:%d: %s", + peerBlockchainInfo.Peer.GetInfo().Address, + peerBlockchainInfo.Peer.GetInfo().Port, + errCasted.Message) + default: + bss.Logger.Infof("failed to getPeerBlockchainInfo: %v", err) + + } } newLastBlock = nil @@ -177,7 +173,7 @@ func (bss *Service) getMoreBlocks() { } if bss.BlockchainDownloader.IsDownloadFinish(lastBlock) { - bss.BlockchainDownloader.SetIsDownloading(false) + bss.BlockTypeStatusService.SetIsDownloading(bss.ChainType, false) bss.Logger.Infof("Finished %s blockchain download: %d blocks pulled", bss.ChainType.GetName(), lastBlock.Height-initialHeight) break } diff --git a/core/blockchainsync/downloadBlockchain.go b/core/blockchainsync/downloadBlockchain.go index 589e1dcfc..014473c96 100644 --- a/core/blockchainsync/downloadBlockchain.go +++ b/core/blockchainsync/downloadBlockchain.go @@ -20,20 +20,19 @@ import ( type ( BlockchainDownloadInterface interface { - SetIsDownloading(newValue bool) IsDownloadFinish(currentLastBlock *model.Block) bool GetPeerBlockchainInfo() (*PeerBlockchainInfo, error) DownloadFromPeer(feederPeer *model.Peer, chainBlockIds []int64, commonBlock *model.Block) (*PeerForkInfo, error) ConfirmWithPeer(peerToCheck *model.Peer, commonMilestoneBlockID int64) ([]int64, error) } BlockchainDownloader struct { - IsDownloading bool // only for status - PeerHasMore bool - ChainType chaintype.ChainType - BlockService service.BlockServiceInterface - PeerServiceClient client.PeerServiceClientInterface - PeerExplorer strategy.PeerExplorerStrategyInterface - Logger *log.Logger + PeerHasMore bool + ChainType chaintype.ChainType + BlockService service.BlockServiceInterface + PeerServiceClient client.PeerServiceClientInterface + PeerExplorer strategy.PeerExplorerStrategyInterface + Logger *log.Logger + BlockTypeStatusService service.BlockTypeStatusServiceInterface } PeerBlockchainInfo struct { @@ -49,6 +48,23 @@ type ( } ) +func NewBlockchainDownloader( + blockService service.BlockServiceInterface, + peerServiceClient client.PeerServiceClientInterface, + peerExplorer strategy.PeerExplorerStrategyInterface, + logger *log.Logger, + blockTypeStatusService service.BlockTypeStatusServiceInterface, +) *BlockchainDownloader { + return &BlockchainDownloader{ + ChainType: blockService.GetChainType(), + BlockService: blockService, + PeerServiceClient: peerServiceClient, + PeerExplorer: peerExplorer, + Logger: logger, + BlockTypeStatusService: blockTypeStatusService, + } +} + func (bd *BlockchainDownloader) IsDownloadFinish(currentLastBlock *model.Block) bool { currentHeight := currentLastBlock.Height currentCumulativeDifficulty := currentLastBlock.CumulativeDifficulty @@ -60,15 +76,13 @@ func (bd *BlockchainDownloader) IsDownloadFinish(currentLastBlock *model.Block) heightAfterDownload := afterDownloadLastBlock.Height cumulativeDifficultyAfterDownload := afterDownloadLastBlock.CumulativeDifficulty if currentHeight > 0 && currentHeight == heightAfterDownload && currentCumulativeDifficulty == cumulativeDifficultyAfterDownload { + // we only initialize this flag (to false) in main, so once is set to true, it will always be true + bd.BlockTypeStatusService.SetFirstDownloadFinished(bd.ChainType, true) return true } return false } -func (bd *BlockchainDownloader) SetIsDownloading(newValue bool) { - bd.IsDownloading = newValue -} - func (bd *BlockchainDownloader) GetPeerBlockchainInfo() (*PeerBlockchainInfo, error) { var ( err error @@ -79,11 +93,15 @@ func (bd *BlockchainDownloader) GetPeerBlockchainInfo() (*PeerBlockchainInfo, er bd.PeerHasMore = true peer := bd.PeerExplorer.GetAnyResolvedPeer() if peer == nil { - return nil, errors.New("no connected peer can be found") + return nil, blocker.NewBlocker(blocker.P2PNetworkConnectionErr, "no connected peer can be found") } peerCumulativeDifficultyResponse, err = bd.PeerServiceClient.GetCumulativeDifficulty(peer, bd.ChainType) if err != nil { - return nil, fmt.Errorf("failed to get Cumulative Difficulty of peer %v: %v", peer.Info.Address, err) + return &PeerBlockchainInfo{ + Peer: peer, + CommonBlock: commonBlock, + }, blocker.NewBlocker(blocker.ChainValidationErr, + fmt.Sprintf("failed to get Cumulative Difficulty of peer %v: %v", peer.Info.Address, err)) } peerCumulativeDifficulty, _ := new(big.Int).SetString(peerCumulativeDifficultyResponse.CumulativeDifficulty, 10) @@ -91,7 +109,7 @@ func (bd *BlockchainDownloader) GetPeerBlockchainInfo() (*PeerBlockchainInfo, er lastBlock, err = bd.BlockService.GetLastBlock() if err != nil { - return nil, err + return nil, blocker.NewBlocker(blocker.DBErr, err.Error()) } lastBlockCumulativeDifficulty, _ := new(big.Int).SetString(lastBlock.CumulativeDifficulty, 10) lastBlockHeight := lastBlock.Height @@ -99,8 +117,11 @@ func (bd *BlockchainDownloader) GetPeerBlockchainInfo() (*PeerBlockchainInfo, er if peerCumulativeDifficulty == nil || lastBlockCumulativeDifficulty == nil || peerCumulativeDifficulty.Cmp(lastBlockCumulativeDifficulty) <= 0 { - return nil, fmt.Errorf("peer's cumulative difficulty %s:%v is lower/same with the current node's", - peer.GetInfo().Address, peer.GetInfo().Port) + return &PeerBlockchainInfo{ + Peer: peer, + CommonBlock: commonBlock, + }, blocker.NewBlocker(blocker.ChainValidationErr, + "cumulative difficulty is lower/same with the current node's") } commonMilestoneBlockID := bd.ChainType.GetGenesisBlockID() @@ -113,22 +134,31 @@ func (bd *BlockchainDownloader) GetPeerBlockchainInfo() (*PeerBlockchainInfo, er chainBlockIds := bd.getBlockIdsAfterCommon(peer, commonMilestoneBlockID) if len(chainBlockIds) < 2 || !bd.PeerHasMore { - return nil, errors.New("the peer does not have more updated chain") + return &PeerBlockchainInfo{ + Peer: peer, + CommonBlock: commonBlock, + }, blocker.NewBlocker(blocker.ChainValidationErr, "the peer does not have more updated chain") } commonBlockID := chainBlockIds[0] commonBlock, err = bd.BlockService.GetBlockByID(commonBlockID, false) if err != nil { - bd.Logger.Infof("common block %v not found, milestone block id: %v", commonBlockID, commonMilestoneBlockID) - return nil, err + return &PeerBlockchainInfo{ + Peer: peer, + CommonBlock: commonBlock, + }, blocker.NewBlocker(blocker.ChainValidationErr, fmt.Sprintf("common block %v not found, milestone block id: %v", + commonBlockID, commonMilestoneBlockID)) } if commonBlock == nil || lastBlockHeight-commonBlock.GetHeight() >= constant.MinRollbackBlocks { - return nil, errors.New("invalid common block") + return &PeerBlockchainInfo{ + Peer: peer, + CommonBlock: commonBlock, + }, blocker.NewBlocker(blocker.ChainValidationErr, "invalid common block") } - if !bd.IsDownloading && peerHeight-commonBlock.GetHeight() > 10 { + if !bd.BlockTypeStatusService.IsDownloading(bd.ChainType) && peerHeight-commonBlock.GetHeight() > 10 { bd.Logger.Info("Blockchain download in progress") - bd.IsDownloading = true + bd.BlockTypeStatusService.SetIsDownloading(bd.ChainType, true) } return &PeerBlockchainInfo{ diff --git a/core/blockchainsync/downloadBlockchain_test.go b/core/blockchainsync/downloadBlockchain_test.go index 2b65601ba..1f5394209 100644 --- a/core/blockchainsync/downloadBlockchain_test.go +++ b/core/blockchainsync/downloadBlockchain_test.go @@ -131,14 +131,30 @@ func (*mockBlockServiceFail) GetLastBlock() (*model.Block, error) { return nil, blocker.NewBlocker(blocker.BlockNotFoundErr, fmt.Sprintf("block is not found")) } +type ( + mockBlockTypeStatusService struct { + coreService.BlockTypeStatusService + } +) + +func (*mockBlockTypeStatusService) IsFirstDownloadFinished(ct chaintype.ChainType) bool { + return true +} + +func (*mockBlockTypeStatusService) IsDownloading(ct chaintype.ChainType) bool { + return true +} + func TestGetPeerCommonBlockID(t *testing.T) { type args struct { - PeerServiceClient client.PeerServiceClientInterface - PeerExplorer strategy.PeerExplorerStrategyInterface - blockService coreService.BlockServiceInterface - queryService query.ExecutorInterface - logger *log.Logger + PeerServiceClient client.PeerServiceClientInterface + PeerExplorer strategy.PeerExplorerStrategyInterface + blockService coreService.BlockServiceInterface + queryService query.ExecutorInterface + logger *log.Logger + blockTypeStatusService coreService.BlockTypeStatusServiceInterface } + tests := []struct { name string args args @@ -148,11 +164,12 @@ func TestGetPeerCommonBlockID(t *testing.T) { { name: "want:getPeerCommonBlockID successfully return common block ID", args: args{ - PeerServiceClient: &mockP2pServiceSuccess{}, - PeerExplorer: &mockPeerExplorer{}, - blockService: &mockBlockServiceSuccess{}, - queryService: &mockQueryServiceSuccess{}, - logger: log.New(), + PeerServiceClient: &mockP2pServiceSuccess{}, + PeerExplorer: &mockPeerExplorer{}, + blockService: &mockBlockServiceSuccess{}, + queryService: &mockQueryServiceSuccess{}, + logger: log.New(), + blockTypeStatusService: &mockBlockTypeStatusService{}, }, want: int64(1), wantErr: false, @@ -160,11 +177,12 @@ func TestGetPeerCommonBlockID(t *testing.T) { { name: "wantErr:getPeerCommonBlockID get last block failed", args: args{ - PeerServiceClient: &mockP2pServiceSuccess{}, - PeerExplorer: &mockPeerExplorer{}, - blockService: &mockBlockServiceFail{}, - queryService: &mockQueryServiceSuccess{}, - logger: log.New(), + PeerServiceClient: &mockP2pServiceSuccess{}, + PeerExplorer: &mockPeerExplorer{}, + blockService: &mockBlockServiceFail{}, + queryService: &mockQueryServiceSuccess{}, + logger: log.New(), + blockTypeStatusService: &mockBlockTypeStatusService{}, }, want: int64(0), wantErr: true, @@ -172,11 +190,12 @@ func TestGetPeerCommonBlockID(t *testing.T) { { name: "wantErr:getPeerCommonBlockID grpc error", args: args{ - PeerServiceClient: &mockP2pServiceFail{}, - PeerExplorer: &mockPeerExplorer{}, - blockService: &mockBlockServiceSuccess{}, - queryService: &mockQueryServiceSuccess{}, - logger: log.New(), + PeerServiceClient: &mockP2pServiceFail{}, + PeerExplorer: &mockPeerExplorer{}, + blockService: &mockBlockServiceSuccess{}, + queryService: &mockQueryServiceSuccess{}, + logger: log.New(), + blockTypeStatusService: &mockBlockTypeStatusService{}, }, want: int64(0), wantErr: true, @@ -186,10 +205,11 @@ func TestGetPeerCommonBlockID(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { blockchainDownloader := &BlockchainDownloader{ - BlockService: tt.args.blockService, - PeerServiceClient: tt.args.PeerServiceClient, - PeerExplorer: tt.args.PeerExplorer, - Logger: tt.args.logger, + BlockService: tt.args.blockService, + PeerServiceClient: tt.args.PeerServiceClient, + PeerExplorer: tt.args.PeerExplorer, + Logger: tt.args.logger, + BlockTypeStatusService: tt.args.blockTypeStatusService, } got, err := blockchainDownloader.getPeerCommonBlockID( &model.Peer{}, @@ -207,10 +227,11 @@ func TestGetPeerCommonBlockID(t *testing.T) { func TestGetBlockIdsAfterCommon(t *testing.T) { type args struct { - PeerServiceClient client.PeerServiceClientInterface - PeerExplorer strategy.PeerExplorerStrategyInterface - blockService coreService.BlockServiceInterface - queryService query.ExecutorInterface + PeerServiceClient client.PeerServiceClientInterface + PeerExplorer strategy.PeerExplorerStrategyInterface + blockService coreService.BlockServiceInterface + queryService query.ExecutorInterface + blockTypeStatusService coreService.BlockTypeStatusServiceInterface } tests := []struct { @@ -221,40 +242,44 @@ func TestGetBlockIdsAfterCommon(t *testing.T) { { name: "want:getBlockIdsAfterCommon (all getBlockIdsAfterCommon new)", args: args{ - PeerServiceClient: &mockP2pServiceSuccessNewResult{}, - PeerExplorer: &mockPeerExplorer{}, - blockService: &mockBlockServiceSuccess{}, - queryService: &mockQueryServiceSuccess{}, + PeerServiceClient: &mockP2pServiceSuccessNewResult{}, + PeerExplorer: &mockPeerExplorer{}, + blockService: &mockBlockServiceSuccess{}, + queryService: &mockQueryServiceSuccess{}, + blockTypeStatusService: &mockBlockTypeStatusService{}, }, want: []int64{3, 4}, }, { name: "want:getBlockIdsAfterCommon (some getBlockIdsAfterCommon already exists)", args: args{ - PeerServiceClient: &mockP2pServiceSuccess{}, - PeerExplorer: &mockPeerExplorer{}, - blockService: &mockBlockServiceSuccess{}, - queryService: &mockQueryServiceSuccess{}, + PeerServiceClient: &mockP2pServiceSuccess{}, + PeerExplorer: &mockPeerExplorer{}, + blockService: &mockBlockServiceSuccess{}, + queryService: &mockQueryServiceSuccess{}, + blockTypeStatusService: &mockBlockTypeStatusService{}, }, want: []int64{2, 3, 4}, }, { name: "want:getBlockIdsAfterCommon (all getBlockIdsAfterCommon already exists)", args: args{ - PeerServiceClient: &mockP2pServiceSuccessOneResult{}, - PeerExplorer: &mockPeerExplorer{}, - blockService: &mockBlockServiceSuccess{}, - queryService: &mockQueryServiceSuccess{}, + PeerServiceClient: &mockP2pServiceSuccessOneResult{}, + PeerExplorer: &mockPeerExplorer{}, + blockService: &mockBlockServiceSuccess{}, + queryService: &mockQueryServiceSuccess{}, + blockTypeStatusService: &mockBlockTypeStatusService{}, }, want: []int64{1}, }, { name: "want:getBlockIdsAfterCommon (GetNextBlockIDs produce error)", args: args{ - PeerServiceClient: &mockP2pServiceFail{}, - PeerExplorer: &mockPeerExplorer{}, - blockService: &mockBlockServiceSuccess{}, - queryService: &mockQueryServiceSuccess{}, + PeerServiceClient: &mockP2pServiceFail{}, + PeerExplorer: &mockPeerExplorer{}, + blockService: &mockBlockServiceSuccess{}, + queryService: &mockQueryServiceSuccess{}, + blockTypeStatusService: &mockBlockTypeStatusService{}, }, want: []int64{}, }, @@ -263,9 +288,10 @@ func TestGetBlockIdsAfterCommon(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { blockchainDownloader := &BlockchainDownloader{ - BlockService: tt.args.blockService, - PeerServiceClient: tt.args.PeerServiceClient, - PeerExplorer: tt.args.PeerExplorer, + BlockService: tt.args.blockService, + PeerServiceClient: tt.args.PeerServiceClient, + PeerExplorer: tt.args.PeerExplorer, + BlockTypeStatusService: tt.args.blockTypeStatusService, } got := blockchainDownloader.getBlockIdsAfterCommon( &model.Peer{}, @@ -311,9 +337,10 @@ func TestGetNextBlocks(t *testing.T) { nil, ) blockchainDownloader := &BlockchainDownloader{ - BlockService: blockService, - PeerServiceClient: &mockP2pServiceSuccess{}, - PeerExplorer: &mockPeerExplorer{}, + BlockService: blockService, + PeerServiceClient: &mockP2pServiceSuccess{}, + PeerExplorer: &mockPeerExplorer{}, + BlockTypeStatusService: &mockBlockTypeStatusService{}, } type args struct { diff --git a/core/service/blockTypeStatusService.go b/core/service/blockTypeStatusService.go new file mode 100644 index 000000000..b0eeb253f --- /dev/null +++ b/core/service/blockTypeStatusService.go @@ -0,0 +1,48 @@ +package service + +import "github.com/zoobc/zoobc-core/common/chaintype" + +type ( + BlockTypeStatusServiceInterface interface { + SetFirstDownloadFinished(ct chaintype.ChainType, isSpineBlocksDownloadFinished bool) + IsFirstDownloadFinished(ct chaintype.ChainType) bool + SetIsDownloading(ct chaintype.ChainType, newValue bool) + IsDownloading(ct chaintype.ChainType) bool + } +) + +type ( + BlockTypeStatusService struct { + isFirstDownloadFinished map[int32]bool + isDownloading map[int32]bool + } +) + +func NewBlockTypeStatusService() *BlockTypeStatusService { + // init variables for all block types + var btss = &BlockTypeStatusService{ + isDownloading: make(map[int32]bool), + isFirstDownloadFinished: make(map[int32]bool), + } + for _, ct := range chaintype.GetChainTypes() { + btss.isDownloading[ct.GetTypeInt()] = false + btss.isFirstDownloadFinished[ct.GetTypeInt()] = false + } + return btss +} + +func (btss *BlockTypeStatusService) SetFirstDownloadFinished(ct chaintype.ChainType, finished bool) { + btss.isFirstDownloadFinished[ct.GetTypeInt()] = finished +} + +func (btss *BlockTypeStatusService) IsFirstDownloadFinished(ct chaintype.ChainType) bool { + return btss.isFirstDownloadFinished[ct.GetTypeInt()] +} + +func (btss *BlockTypeStatusService) SetIsDownloading(ct chaintype.ChainType, newValue bool) { + btss.isDownloading[ct.GetTypeInt()] = newValue +} + +func (btss *BlockTypeStatusService) IsDownloading(ct chaintype.ChainType) bool { + return btss.isDownloading[ct.GetTypeInt()] +} diff --git a/core/service/snapshotMainBlockService.go b/core/service/snapshotMainBlockService.go index 6975b6944..75174123b 100644 --- a/core/service/snapshotMainBlockService.go +++ b/core/service/snapshotMainBlockService.go @@ -65,14 +65,14 @@ func (ss *SnapshotMainBlockService) NewSnapshotFile(block *model.Block) (snapsho fileChunkHashes [][]byte snapshotPayload = new(model.SnapshotPayload) snapshotExpirationTimestamp = block.Timestamp + int64(ss.chainType.GetSnapshotGenerationTimeout().Seconds()) - // (safe) height to get snapshot's data from - snapshotPayloadHeight int = int(block.Height) - int(constant.MinRollbackBlocks) ) - if snapshotPayloadHeight <= 0 { + if block.Height <= constant.MinRollbackBlocks { return nil, blocker.NewBlocker(blocker.ValidationErr, - fmt.Sprintf("invalid snapshot height: %d", snapshotPayloadHeight)) + fmt.Sprintf("invalid snapshot height: %d", block.Height)) } + // (safe) height to get snapshot's data from + snapshotPayloadHeight := block.Height - constant.MinRollbackBlocks for qryRepoName, snapshotQuery := range ss.SnapshotQueries { func() { @@ -81,11 +81,11 @@ func (ss *SnapshotMainBlockService) NewSnapshotFile(block *model.Block) (snapsho rows *sql.Rows ) if qryRepoName == "publishedReceipt" { - if uint32(snapshotPayloadHeight) > constant.LinkedReceiptBlocksLimit { - fromHeight = uint32(snapshotPayloadHeight) - constant.LinkedReceiptBlocksLimit + if snapshotPayloadHeight > constant.LinkedReceiptBlocksLimit { + fromHeight = snapshotPayloadHeight - constant.LinkedReceiptBlocksLimit } } - qry := snapshotQuery.SelectDataForSnapshot(fromHeight, uint32(snapshotPayloadHeight)) + qry := snapshotQuery.SelectDataForSnapshot(fromHeight, snapshotPayloadHeight) rows, err = ss.QueryExecutor.ExecuteSelect(qry, false) if err != nil { return diff --git a/core/service/snapshotService.go b/core/service/snapshotService.go index d209df130..406feecc2 100644 --- a/core/service/snapshotService.go +++ b/core/service/snapshotService.go @@ -19,13 +19,13 @@ type ( GenerateSnapshot(block *model.Block, ct chaintype.ChainType, chunkSizeBytes int) (*model.SnapshotFileInfo, error) IsSnapshotProcessing(ct chaintype.ChainType) bool StopSnapshotGeneration(ct chaintype.ChainType) error - DownloadSnapshot(spineBlockManifest *model.SpineBlockManifest) error + DownloadSnapshot(ct chaintype.ChainType, spineBlockManifest *model.SpineBlockManifest) error StartSnapshotListener() observer.Listener } SnapshotService struct { SpineBlockManifestService SpineBlockManifestServiceInterface - SpineBlockDownloadService SpineBlockDownloadServiceInterface + BlockTypeStatusService BlockTypeStatusServiceInterface SnapshotBlockServices map[int32]SnapshotBlockServiceInterface // map key = chaintype number (eg. mainchain = 0) FileDownloaderService FileDownloaderServiceInterface FileService FileServiceInterface @@ -42,7 +42,7 @@ var ( func NewSnapshotService( spineBlockManifestService SpineBlockManifestServiceInterface, - spineBlockDownloadService SpineBlockDownloadServiceInterface, + blockTypeStatusService BlockTypeStatusServiceInterface, snapshotBlockServices map[int32]SnapshotBlockServiceInterface, fileDownloaderService FileDownloaderServiceInterface, fileService FileServiceInterface, @@ -50,7 +50,7 @@ func NewSnapshotService( ) *SnapshotService { return &SnapshotService{ SpineBlockManifestService: spineBlockManifestService, - SpineBlockDownloadService: spineBlockDownloadService, + BlockTypeStatusService: blockTypeStatusService, SnapshotBlockServices: snapshotBlockServices, FileDownloaderService: fileDownloaderService, FileService: fileService, @@ -112,9 +112,10 @@ func (ss *SnapshotService) StartSnapshotListener() observer.Listener { } if snapshotBlockService.IsSnapshotHeight(block.Height) { go func() { - // if spine blocks is downloading, do not generate (or download from other peers) snapshots - // don't generate snapshots until all spine blocks have been downloaded - if !ss.SpineBlockDownloadService.IsSpineBlocksDownloadFinished() { + // if spine and main blocks are still downloading, after the node has started, + // do not generate (or download from other peers) snapshots + if !ss.BlockTypeStatusService.IsFirstDownloadFinished(&chaintype.MainChain{}) && !ss. + BlockTypeStatusService.IsFirstDownloadFinished(&chaintype.SpineChain{}) { ss.Logger.Infof("Snapshot at block "+ "height %d not generated because spine blocks are still downloading", block.Height) @@ -152,7 +153,7 @@ func (ss *SnapshotService) StartSnapshotListener() observer.Listener { } } -func (ss *SnapshotService) DownloadSnapshot(spineBlockManifest *model.SpineBlockManifest) error { +func (ss *SnapshotService) DownloadSnapshot(ct chaintype.ChainType, spineBlockManifest *model.SpineBlockManifest) error { var ( failedDownloadChunkNames = make([]string, 0) hashSize = sha3.New256().Size() diff --git a/core/service/snapshotService_test.go b/core/service/snapshotService_test.go index dce990d92..760949753 100644 --- a/core/service/snapshotService_test.go +++ b/core/service/snapshotService_test.go @@ -131,87 +131,6 @@ func (*mockMainchain) GetSmithingPeriod() int64 { return 15 } -// FIXME: uncomment and fix the test once this method is completed -// func TestSnapshotService_GenerateSnapshot(t *testing.T) { -// type fields struct { -// QueryExecutor query.ExecutorInterface -// SpineBlockManifestQuery query.SpineBlockManifestQueryInterface -// SpineBlockQuery query.BlockQueryInterface -// MainBlockQuery query.BlockQueryInterface -// FileChunkQuery query.FileChunkQueryInterface -// Logger *log.Logger -// Spinechain chaintype.ChainType -// Mainchain chaintype.ChainType -// MainchainSnapshotInterval int64 -// SnapshotGenerationTimeout int64 -// } -// type args struct { -// mainHeight uint32 -// ct chaintype.ChainType -// } -// tests := []struct { -// name string -// fields fields -// args args -// want *model.SpineBlockManifest -// wantErr bool -// }{ -// { -// name: "GenerateSnapshot:success", -// fields: fields{ -// QueryExecutor: &mockSnapshotServiceQueryExecutor{ -// testName: "GenerateSnapshot:success", -// }, -// SpineBlockQuery: query.NewBlockQuery(ssSpinechain), -// MainBlockQuery: query.NewBlockQuery(ssMainchain), -// SpineBlockManifestQuery: query.NewSpineBlockManifestQuery(), -// FileChunkQuery: query.NewFileChunkQuery(), -// Logger: log.New(), -// Spinechain: &mockSpinechain{}, -// Mainchain: &mockMainchain{}, -// MainchainSnapshotInterval: ssSnapshotInterval, -// SnapshotGenerationTimeout: ssSnapshotGenerationTimeout, -// }, -// args: args{ -// mainHeight: ssMockMainBlock.Height, -// ct: &chaintype.MainChain{}, -// }, -// wantErr: false, -// want: &model.SpineBlockManifest{ -// ID: int64(1919891213155270003), -// FullFileHash: make([]byte, 64), -// SpineBlockManifestHeight: ssMockMainBlock.Height, -// SpineBlockHeight: uint32(419), -// FileChunks: make([]*model.FileChunk, 0), -// }, -// }, -// } -// for _, tt := range tests { -// t.Run(tt.name, func(t *testing.T) { -// ss := &SnapshotService{ -// QueryExecutor: tt.fields.QueryExecutor, -// SpineBlockManifestQuery: tt.fields.SpineBlockManifestQuery, -// SpineBlockQuery: tt.fields.SpineBlockQuery, -// MainBlockQuery: tt.fields.MainBlockQuery, -// FileChunkQuery: tt.fields.FileChunkQuery, -// Logger: tt.fields.Logger, -// Spinechain: tt.fields.Spinechain, -// Mainchain: tt.fields.Mainchain, -// MainchainSnapshotInterval: tt.fields.MainchainSnapshotInterval, -// SnapshotGenerationTimeout: tt.fields.SnapshotGenerationTimeout, -// } -// got, err := ss.GenerateSnapshot(tt.args.mainHeight, tt.args.ct) -// if (err != nil) != tt.wantErr { -// t.Errorf("SnapshotService.GenerateSnapshot() error = %v, wantErr %v", err, tt.wantErr) -// return -// } -// if !reflect.DeepEqual(got, tt.want) { -// t.Errorf("SnapshotService.GenerateSnapshot() = %v, want %v", got, tt.want) -// } -// }) -// } -// } - type ( mockFileDownloaderService struct { FileDownloaderService @@ -229,7 +148,7 @@ func (mfdf *mockFileDownloaderService) DownloadFileByName(fileName string, fileH func TestSnapshotService_DownloadSnapshot(t *testing.T) { type fields struct { SpineBlockManifestService SpineBlockManifestServiceInterface - SpineBlockDownloadService SpineBlockDownloadServiceInterface + BlockTypeStatusService BlockTypeStatusServiceInterface SnapshotBlockServices map[int32]SnapshotBlockServiceInterface FileDownloaderService FileDownloaderServiceInterface FileService FileServiceInterface @@ -237,6 +156,7 @@ func TestSnapshotService_DownloadSnapshot(t *testing.T) { } type args struct { spineBlockManifest *model.SpineBlockManifest + ct chaintype.ChainType } tests := []struct { name string @@ -251,6 +171,7 @@ func TestSnapshotService_DownloadSnapshot(t *testing.T) { spineBlockManifest: &model.SpineBlockManifest{ FileChunkHashes: make([]byte, 0), }, + ct: &chaintype.MainChain{}, }, wantErr: true, errMsg: "ValidationErr: invalid file chunks hashes length", @@ -270,6 +191,7 @@ func TestSnapshotService_DownloadSnapshot(t *testing.T) { spineBlockManifest: &model.SpineBlockManifest{ FileChunkHashes: make([]byte, 64), }, + ct: &chaintype.MainChain{}, }, wantErr: true, errMsg: "AppErr: One or more snapshot chunks failed to download [AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" + @@ -290,6 +212,7 @@ func TestSnapshotService_DownloadSnapshot(t *testing.T) { spineBlockManifest: &model.SpineBlockManifest{ FileChunkHashes: make([]byte, 64), }, + ct: &chaintype.MainChain{}, }, }, } @@ -297,13 +220,13 @@ func TestSnapshotService_DownloadSnapshot(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ss := &SnapshotService{ SpineBlockManifestService: tt.fields.SpineBlockManifestService, - SpineBlockDownloadService: tt.fields.SpineBlockDownloadService, + BlockTypeStatusService: tt.fields.BlockTypeStatusService, SnapshotBlockServices: tt.fields.SnapshotBlockServices, FileDownloaderService: tt.fields.FileDownloaderService, FileService: tt.fields.FileService, Logger: tt.fields.Logger, } - if err := ss.DownloadSnapshot(tt.args.spineBlockManifest); err != nil { + if err := ss.DownloadSnapshot(tt.args.ct, tt.args.spineBlockManifest); err != nil { if !tt.wantErr { t.Errorf("SnapshotService.DownloadSnapshot() error = %v, wantErr %v", err, tt.wantErr) } diff --git a/core/service/spineBlockDownloadService.go b/core/service/spineBlockDownloadService.go deleted file mode 100644 index 770455665..000000000 --- a/core/service/spineBlockDownloadService.go +++ /dev/null @@ -1,26 +0,0 @@ -package service - -type ( - SpineBlockDownloadServiceInterface interface { - SetSpineBlocksDownloadFinished(isSpineBlocksDownloadFinished bool) - IsSpineBlocksDownloadFinished() bool - } - - SpineBlockDownloadService struct { - isSpineBlocksDownloadFinished bool - } -) - -func NewSpineBlockDownloadService() *SpineBlockDownloadService { - return &SpineBlockDownloadService{ - isSpineBlocksDownloadFinished: false, - } -} - -func (sbds *SpineBlockDownloadService) SetSpineBlocksDownloadFinished(isSpineBlocksDownloadFinished bool) { - sbds.isSpineBlocksDownloadFinished = isSpineBlocksDownloadFinished -} - -func (sbds *SpineBlockDownloadService) IsSpineBlocksDownloadFinished() bool { - return sbds.isSpineBlocksDownloadFinished -} diff --git a/core/smith/blockchainProcessor.go b/core/smith/blockchainProcessor.go index 15035cb4d..6198f61d6 100644 --- a/core/smith/blockchainProcessor.go +++ b/core/smith/blockchainProcessor.go @@ -1,6 +1,7 @@ package smith import ( + "github.com/zoobc/zoobc-core/common/chaintype" "time" log "github.com/sirupsen/logrus" @@ -22,12 +23,13 @@ type ( // BlockchainProcessor handle smithing process, can be switch to process different chain by supplying different chain type BlockchainProcessor struct { - Generator *model.Blocksmith - BlockService service.BlockServiceInterface - LastBlockID int64 - Logger *log.Logger - isSmithing bool - smithError error + Generator *model.Blocksmith + BlockService service.BlockServiceInterface + LastBlockID int64 + Logger *log.Logger + isSmithing bool + smithError error + BlockTypeStatusService service.BlockTypeStatusServiceInterface } ) @@ -40,11 +42,13 @@ func NewBlockchainProcessor( blocksmith *model.Blocksmith, blockService service.BlockServiceInterface, logger *log.Logger, + blockTypeStatusService service.BlockTypeStatusServiceInterface, ) *BlockchainProcessor { return &BlockchainProcessor{ - Generator: blocksmith, - BlockService: blockService, - Logger: logger, + Generator: blocksmith, + BlockService: blockService, + Logger: logger, + BlockTypeStatusService: blockTypeStatusService, } } @@ -162,6 +166,9 @@ func (bp *BlockchainProcessor) StartSmithing() error { // Start starts the blockchainProcessor func (bp *BlockchainProcessor) Start(sleepPeriod int) { + var ( + mainchain = &chaintype.MainChain{} + ) ticker := time.NewTicker(time.Duration(sleepPeriod) * time.Millisecond) stopSmith = make(chan bool) go func() { @@ -174,14 +181,17 @@ func (bp *BlockchainProcessor) Start(sleepPeriod int) { bp.smithError = nil return case <-ticker.C: - err := bp.StartSmithing() - if err != nil { - bp.Logger.Debugf("Smith Error for %s. %s", bp.BlockService.GetChainType().GetName(), err.Error()) - bp.isSmithing = false - bp.smithError = err + // when starting a node, do not start smithing until the main blocks have been fully downloaded + if bp.BlockTypeStatusService.IsFirstDownloadFinished(mainchain) { + err := bp.StartSmithing() + if err != nil { + bp.Logger.Debugf("Smith Error for %s. %s", bp.BlockService.GetChainType().GetName(), err.Error()) + bp.isSmithing = false + bp.smithError = err + } + bp.isSmithing = true + bp.smithError = nil } - bp.isSmithing = true - bp.smithError = nil } } }() diff --git a/core/smith/blockchainProcessor_test.go b/core/smith/blockchainProcessor_test.go index 351e0752d..8a7454671 100644 --- a/core/smith/blockchainProcessor_test.go +++ b/core/smith/blockchainProcessor_test.go @@ -13,9 +13,10 @@ import ( func TestNewBlockchainProcessor(t *testing.T) { type args struct { - blocksmith *model.Blocksmith - blockService service.BlockServiceInterface - logger *log.Logger + blocksmith *model.Blocksmith + blockService service.BlockServiceInterface + logger *log.Logger + blockTypeStatusService service.BlockTypeStatusServiceInterface } tests := []struct { name string @@ -25,12 +26,14 @@ func TestNewBlockchainProcessor(t *testing.T) { { name: "wantSuccess", args: args{ - blocksmith: &model.Blocksmith{}, - blockService: &service.BlockService{}, + blocksmith: &model.Blocksmith{}, + blockService: &service.BlockService{}, + blockTypeStatusService: service.NewBlockTypeStatusService(), }, want: &BlockchainProcessor{ - BlockService: &service.BlockService{}, - Generator: &model.Blocksmith{}, + BlockService: &service.BlockService{}, + Generator: &model.Blocksmith{}, + BlockTypeStatusService: service.NewBlockTypeStatusService(), }, }, } @@ -40,6 +43,7 @@ func TestNewBlockchainProcessor(t *testing.T) { tt.args.blocksmith, tt.args.blockService, tt.args.logger, + tt.args.blockTypeStatusService, ); !reflect.DeepEqual(got, tt.want) { t.Errorf("NewBlockchainProcessor() = %v, want %v", got, tt.want) } diff --git a/main.go b/main.go index 979b883a2..52b5cb9b8 100644 --- a/main.go +++ b/main.go @@ -46,47 +46,51 @@ import ( var ( dbPath, dbName, badgerDbPath, badgerDbName, nodeSecretPhrase, nodeKeyPath, nodeKeyFile, nodePreSeed, ownerAccountAddress, myAddress, nodeKeyFilePath, snapshotPath string - dbInstance *database.SqliteDB - badgerDbInstance *database.BadgerDB - db *sql.DB - badgerDb *badger.DB - apiRPCPort, apiHTTPPort, monitoringPort int - apiCertFile, apiKeyFile string - peerPort uint32 - p2pServiceInstance p2p.Peer2PeerServiceInterface - queryExecutor *query.Executor - kvExecutor *kvdb.KVExecutor - observerInstance *observer.Observer - schedulerInstance *util.Scheduler - blockServices = make(map[int32]service.BlockServiceInterface) - snapshotBlockServices = make(map[int32]service.SnapshotBlockServiceInterface) - mainchainBlockService *service.BlockService - mainBlockSnapshotChunkStrategy service.SnapshotChunkStrategyInterface - spinechainBlockService *service.BlockSpineService - fileDownloadService service.FileDownloaderServiceInterface - mempoolServices = make(map[int32]service.MempoolServiceInterface) - blockIncompleteQueueService service.BlockIncompleteQueueServiceInterface - receiptService service.ReceiptServiceInterface - peerServiceClient client.PeerServiceClientInterface - p2pHost *model.Host - peerExplorer p2pStrategy.PeerExplorerStrategyInterface - wellknownPeers []string - smithing, isNodePreSeed, isDebugMode bool - nodeRegistrationService service.NodeRegistrationServiceInterface - mainchainProcessor smith.BlockchainProcessorInterface - spinechainProcessor smith.BlockchainProcessorInterface - loggerAPIService *log.Logger - loggerCoreService *log.Logger - loggerP2PService *log.Logger - spinechainSynchronizer, mainchainSynchronizer *blockchainsync.Service - spineBlockManifestService service.SpineBlockManifestServiceInterface - spineBlockDownloadService service.SpineBlockDownloadServiceInterface - snapshotService service.SnapshotServiceInterface - transactionUtil = &transaction.Util{} - receiptUtil = &coreUtil.ReceiptUtil{} - transactionCoreServiceIns service.TransactionCoreServiceInterface - fileService service.FileServiceInterface - chainTypes = chaintype.GetChainTypes() + dbInstance *database.SqliteDB + badgerDbInstance *database.BadgerDB + db *sql.DB + badgerDb *badger.DB + apiRPCPort, apiHTTPPort, monitoringPort int + apiCertFile, apiKeyFile string + peerPort uint32 + p2pServiceInstance p2p.Peer2PeerServiceInterface + queryExecutor *query.Executor + kvExecutor *kvdb.KVExecutor + observerInstance *observer.Observer + schedulerInstance *util.Scheduler + blockServices = make(map[int32]service.BlockServiceInterface) + snapshotBlockServices = make(map[int32]service.SnapshotBlockServiceInterface) + mainchainBlockService *service.BlockService + mainBlockSnapshotChunkStrategy service.SnapshotChunkStrategyInterface + spinechainBlockService *service.BlockSpineService + fileDownloadService service.FileDownloaderServiceInterface + mempoolServices = make(map[int32]service.MempoolServiceInterface) + blockIncompleteQueueService service.BlockIncompleteQueueServiceInterface + receiptService service.ReceiptServiceInterface + peerServiceClient client.PeerServiceClientInterface + p2pHost *model.Host + peerExplorer p2pStrategy.PeerExplorerStrategyInterface + wellknownPeers []string + smithing, isNodePreSeed, isDebugMode bool + nodeRegistrationService service.NodeRegistrationServiceInterface + mainchainProcessor smith.BlockchainProcessorInterface + spinechainProcessor smith.BlockchainProcessorInterface + loggerAPIService *log.Logger + loggerCoreService *log.Logger + loggerP2PService *log.Logger + spinechainSynchronizer, mainchainSynchronizer *blockchainsync.Service + spineBlockManifestService service.SpineBlockManifestServiceInterface + snapshotService service.SnapshotServiceInterface + transactionUtil = &transaction.Util{} + receiptUtil = &coreUtil.ReceiptUtil{} + transactionCoreServiceIns service.TransactionCoreServiceInterface + fileService service.FileServiceInterface + chainTypes = chaintype.GetChainTypes() + mainchain = &chaintype.MainChain{} + spinechain = &chaintype.SpineChain{} + blockTypeStatusService service.BlockTypeStatusServiceInterface + mainchainDownloader, spinechainDownloader blockchainsync.BlockchainDownloadInterface + mainchainForkProcessor, spinechainForkProcessor blockchainsync.ForkingProcessorInterface ) func init() { @@ -138,7 +142,7 @@ func init() { query.NewAccountBalanceQuery(), query.NewNodeRegistrationQuery(), query.NewParticipationScoreQuery(), - query.NewBlockQuery(chainTypes[0]), + query.NewBlockQuery(mainchain), loggerCoreService, ) receiptService = service.NewReceiptService( @@ -146,7 +150,7 @@ func init() { query.NewBatchReceiptQuery(), query.NewMerkleTreeQuery(), query.NewNodeRegistrationQuery(), - query.NewBlockQuery(chainTypes[0]), + query.NewBlockQuery(mainchain), kvExecutor, queryExecutor, nodeRegistrationService, @@ -154,11 +158,11 @@ func init() { query.NewPublishedReceiptQuery(), receiptUtil, ) - spineBlockDownloadService = service.NewSpineBlockDownloadService() + blockTypeStatusService = service.NewBlockTypeStatusService() spineBlockManifestService = service.NewSpineBlockManifestService( queryExecutor, query.NewSpineBlockManifestQuery(), - query.NewBlockQuery(chainTypes[1]), + query.NewBlockQuery(spinechain), loggerCoreService, ) fileService = service.NewFileService( @@ -169,7 +173,7 @@ func init() { constant.SnapshotChunkSize, fileService, ) - snapshotBlockServices[chainTypes[0].GetTypeInt()] = service.NewSnapshotMainBlockService( + snapshotBlockServices[mainchain.GetTypeInt()] = service.NewSnapshotMainBlockService( snapshotPath, queryExecutor, loggerCoreService, @@ -180,7 +184,7 @@ func init() { query.NewAccountDatasetsQuery(), query.NewEscrowTransactionQuery(), query.NewPublishedReceiptQuery(), - query.GetSnapshotQuery(chainTypes[0]), + query.GetSnapshotQuery(mainchain), ) fileDownloadService = service.NewFileDownloaderService( @@ -190,7 +194,7 @@ func init() { ) snapshotService = service.NewSnapshotService( spineBlockManifestService, - spineBlockDownloadService, + blockTypeStatusService, snapshotBlockServices, fileDownloadService, fileService, @@ -199,7 +203,7 @@ func init() { transactionCoreServiceIns = service.NewTransactionCoreService( queryExecutor, - query.NewTransactionQuery(chainTypes[0]), + query.NewTransactionQuery(mainchain), query.NewEscrowTransactionQuery(), ) @@ -331,7 +335,7 @@ func initP2pInstance() { peerServiceClient, nodeRegistrationService, queryExecutor, - query.NewBlockQuery(chainTypes[0]), + query.NewBlockQuery(mainchain), loggerP2PService, ) p2pServiceInstance, _ = p2p.NewP2PService( @@ -408,18 +412,18 @@ func startMainchain() { err error sleepPeriod = 500 ) - monitoring.SetBlockchainStatus(chainTypes[0].GetTypeInt(), constant.BlockchainStatusIdle) + monitoring.SetBlockchainStatus(mainchain.GetTypeInt(), constant.BlockchainStatusIdle) mempoolService := service.NewMempoolService( transactionUtil, - chainTypes[0], + mainchain, kvExecutor, queryExecutor, - query.NewMempoolQuery(chainTypes[0]), + query.NewMempoolQuery(mainchain), query.NewMerkleTreeQuery(), &transaction.TypeSwitcher{Executor: queryExecutor}, query.NewAccountBalanceQuery(), - query.NewBlockQuery(chainTypes[0]), - query.NewTransactionQuery(chainTypes[0]), + query.NewBlockQuery(mainchain), + query.NewTransactionQuery(mainchain), crypto.NewSignature(), observerInstance, loggerCoreService, @@ -427,7 +431,7 @@ func startMainchain() { receiptService, transactionCoreServiceIns, ) - mempoolServices[chainTypes[0].GetTypeInt()] = mempoolService + mempoolServices[mainchain.GetTypeInt()] = mempoolService actionSwitcher := &transaction.TypeSwitcher{ Executor: queryExecutor, @@ -439,7 +443,7 @@ func startMainchain() { loggerCoreService, ) blockIncompleteQueueService = service.NewBlockIncompleteQueueService( - chainTypes[0], + mainchain, observerInstance, ) mainchainBlockPool := service.NewBlockPoolService() @@ -469,12 +473,12 @@ func startMainchain() { queryExecutor, ) mainchainBlockService = service.NewBlockMainService( - chainTypes[0], + mainchain, kvExecutor, queryExecutor, - query.NewBlockQuery(chainTypes[0]), - query.NewMempoolQuery(chainTypes[0]), - query.NewTransactionQuery(chainTypes[0]), + query.NewBlockQuery(mainchain), + query.NewMempoolQuery(mainchain), + query.NewTransactionQuery(mainchain), query.NewSkippedBlocksmithQuery(), crypto.NewSignature(), mempoolService, @@ -499,7 +503,7 @@ func startMainchain() { mainchainParticipationScoreService, mainchainPublishedReceiptService, ) - blockServices[chainTypes[0].GetTypeInt()] = mainchainBlockService + blockServices[mainchain.GetTypeInt()] = mainchainBlockService if !mainchainBlockService.CheckGenesis() { // Add genesis if not exist // genesis account will be inserted in the very beginning @@ -543,23 +547,41 @@ func startMainchain() { model.NewBlocksmith(nodeSecretPhrase, nodePublicKey, node.NodeID), mainchainBlockService, loggerCoreService, + blockTypeStatusService, ) mainchainProcessor.Start(sleepPeriod) } } - mainchainSynchronizer = blockchainsync.NewBlockchainSyncService( + mainchainDownloader = blockchainsync.NewBlockchainDownloader( mainchainBlockService, - peerServiceClient, peerExplorer, - queryExecutor, mempoolService, - actionSwitcher, + peerServiceClient, + peerExplorer, loggerCoreService, - kvExecutor, - transactionUtil, - service.NewTransactionCoreService( + blockTypeStatusService, + ) + mainchainForkProcessor = &blockchainsync.ForkingProcessor{ + ChainType: mainchainBlockService.GetChainType(), + BlockService: mainchainBlockService, + QueryExecutor: queryExecutor, + ActionTypeSwitcher: actionSwitcher, + MempoolService: mempoolService, + KVExecutor: kvExecutor, + PeerExplorer: peerExplorer, + Logger: loggerCoreService, + TransactionUtil: transactionUtil, + TransactionCorService: service.NewTransactionCoreService( queryExecutor, - query.NewTransactionQuery(chainTypes[0]), + query.NewTransactionQuery(mainchain), query.NewEscrowTransactionQuery(), ), + } + mainchainSynchronizer = blockchainsync.NewBlockchainSyncService( + mainchainBlockService, + peerServiceClient, peerExplorer, + loggerCoreService, + blockTypeStatusService, + mainchainDownloader, + mainchainForkProcessor, ) } @@ -567,18 +589,18 @@ func startSpinechain() { var ( nodeID int64 ) - monitoring.SetBlockchainStatus(chainTypes[1].GetTypeInt(), constant.BlockchainStatusIdle) + monitoring.SetBlockchainStatus(spinechain.GetTypeInt(), constant.BlockchainStatusIdle) sleepPeriod := 500 blocksmithStrategySpine := blockSmithStrategy.NewBlocksmithStrategySpine( queryExecutor, query.NewSpinePublicKeyQuery(), loggerCoreService, - query.NewBlockQuery(chainTypes[1]), + query.NewBlockQuery(spinechain), ) spinechainBlockService = service.NewBlockSpineService( - chainTypes[1], + spinechain, queryExecutor, - query.NewBlockQuery(chainTypes[1]), + query.NewBlockQuery(spinechain), query.NewSpinePublicKeyQuery(), crypto.NewSignature(), query.NewNodeRegistrationQuery(), @@ -587,7 +609,7 @@ func startSpinechain() { loggerCoreService, query.NewSpineBlockManifestQuery(), ) - blockServices[chainTypes[1].GetTypeInt()] = spinechainBlockService + blockServices[spinechain.GetTypeInt()] = spinechainBlockService if !spinechainBlockService.CheckGenesis() { // Add genesis if not exist if err := spinechainBlockService.AddGenesis(); err != nil { @@ -605,27 +627,48 @@ func startSpinechain() { model.NewBlocksmith(nodeSecretPhrase, nodePublicKey, nodeID), spinechainBlockService, loggerCoreService, + blockTypeStatusService, ) spinechainProcessor.Start(sleepPeriod) } + spinechainDownloader = blockchainsync.NewBlockchainDownloader( + spinechainBlockService, + peerServiceClient, + peerExplorer, + loggerCoreService, + blockTypeStatusService, + ) + spinechainForkProcessor = &blockchainsync.ForkingProcessor{ + ChainType: spinechainBlockService.GetChainType(), + BlockService: spinechainBlockService, + QueryExecutor: queryExecutor, + ActionTypeSwitcher: nil, // no mempool for spine blocks + MempoolService: nil, // no transaction types for spine blocks + KVExecutor: kvExecutor, + PeerExplorer: peerExplorer, + Logger: loggerCoreService, + TransactionUtil: transactionUtil, + TransactionCorService: service.NewTransactionCoreService( + queryExecutor, + query.NewTransactionQuery(mainchain), + query.NewEscrowTransactionQuery(), + ), + } spinechainSynchronizer = blockchainsync.NewBlockchainSyncService( spinechainBlockService, peerServiceClient, peerExplorer, - queryExecutor, - nil, // no mempool for spine blocks - nil, // no transaction types for spine blocks loggerCoreService, - kvExecutor, - transactionUtil, - transactionCoreServiceIns, + blockTypeStatusService, + spinechainDownloader, + spinechainForkProcessor, ) } // Scheduler Init func startScheduler() { var ( - mainchainMempoolService = mempoolServices[chainTypes[0].GetTypeInt()] + mainchainMempoolService = mempoolServices[mainchain.GetTypeInt()] ) // scheduler remove expired mempool transaction if err := schedulerInstance.AddJob( @@ -655,7 +698,7 @@ func startScheduler() { ); err != nil { loggerCoreService.Error("Scheduler Err: ", err.Error()) } - // register scan block pool for chainTypes[0] + // register scan block pool for mainchain if err := schedulerInstance.AddJob( constant.BlockPoolScanPeriod, mainchainBlockService.ScanBlockPool, @@ -673,7 +716,7 @@ func startScheduler() { func startBlockchainSyncronizers() { go spinechainSynchronizer.Start() - ticker := time.NewTicker(constant.BlockchainsyncSpineCheckInterval) + ticker := time.NewTicker(constant.BlockchainsyncCheckInterval) timeout := time.After(constant.BlockchainsyncSpineTimeout) syncronizersLoop: for { @@ -684,37 +727,49 @@ syncronizersLoop: loggerCoreService.Errorf("cannot get last spine block") os.Exit(1) } - if spinechainSynchronizer.BlockchainDownloader.IsDownloadFinish(lastSpineBlock) { - spineBlockDownloadService.SetSpineBlocksDownloadFinished(true) + if blockTypeStatusService.IsFirstDownloadFinished(spinechain) { ticker.Stop() // loop through all chain types that support snapshots and download them if we find relative // spineBlockManifest for i := 0; i < len(chainTypes); i++ { ct := chaintype.GetChainType(int32(i)) - lastSpineBlockManifest, err := spineBlockManifestService.GetLastSpineBlockManifest(ct, - model.SpineBlockManifestType_Snapshot) + // exclude spinechain + if i == int(spinechain.GetTypeInt()) { + continue + } + + lastMainBlock, err := mainchainSynchronizer.BlockService.GetLastBlock() if err != nil { - loggerCoreService.Errorf("db error: cannot get last spineBlockManifest for chaintype %s", - ct.GetName()) - break + loggerCoreService.Errorf("cannot get last main block") + os.Exit(1) } - if lastSpineBlockManifest != nil { - loggerCoreService.Infof("found spineBlockManifest for chaintype %s at spine height %d. "+ - "snapshot taken at block height %d", ct.GetName(), lastSpineBlock.Height, - lastSpineBlockManifest.SpineBlockManifestHeight) + // only download/apply snapshots first time a node joins the network (for now) + if lastMainBlock.Height == 0 { // snapshot download - if err := snapshotService.DownloadSnapshot(lastSpineBlockManifest); err != nil { - loggerCoreService.Info(err) + lastSpineBlockManifest, err := spineBlockManifestService.GetLastSpineBlockManifest(ct, + model.SpineBlockManifestType_Snapshot) + if err != nil { + loggerCoreService.Errorf("db error: cannot get last spineBlockManifest for chaintype %s", + ct.GetName()) + break + } + if lastSpineBlockManifest != nil { + loggerCoreService.Infof("found spineBlockManifest for chaintype %s at spine height %d. "+ + "snapshot taken at block height %d", ct.GetName(), lastSpineBlock.Height, + lastSpineBlockManifest.SpineBlockManifestHeight) + if err := snapshotService.DownloadSnapshot(ct, lastSpineBlockManifest); err != nil { + loggerCoreService.Info(err) + } } } - // download remaining main blocks and start the chainTypes[0] synchronizer + // download remaining main blocks and start the mainchain synchronizer // TODO: generalise this so that we can just inject the chaintype and will start the correct // syncronizer switch ct.(type) { case *chaintype.MainChain: go mainchainSynchronizer.Start() default: - loggerCoreService.Errorf("invalid chaintype %s", ct.GetName()) + loggerCoreService.Debug("invalid chaintype for snapshot") } } break syncronizersLoop