Skip to content
Merged
15 changes: 15 additions & 0 deletions cmd/block/blockGenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import (
"github.com/zoobc/zoobc-core/observer"
)

type (
mockBlockTypeStatusService struct {
service.BlockTypeStatusService
}
)

var (
blocksmith *model.Blocksmith
chainType chaintype.ChainType
Expand Down Expand Up @@ -52,6 +58,14 @@ var (
}
)

func (*mockBlockTypeStatusService) IsFirstDownloadFinished(ct chaintype.ChainType) bool {
return true
}

func (*mockBlockTypeStatusService) IsDownloading(ct chaintype.ChainType) bool {
return true
}

func init() {
fakeBlockCmd.Flags().IntVar(
&numberOfBlocks,
Expand Down Expand Up @@ -194,6 +208,7 @@ func generateBlocks(numberOfBlocks int, blocksmithSecretPhrase, outputPath strin
blocksmith,
blockService,
log.New(),
&mockBlockTypeStatusService{},
)
startTime := time.Now().UnixNano() / 1e6
fmt.Printf("generating %d blocks\n", numberOfBlocks)
Expand Down
1 change: 1 addition & 0 deletions common/blocker/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
ServerError TypeBlocker = "ServerError"
SmithingErr TypeBlocker = "SmithingErr"
ChainValidationErr TypeBlocker = "ChainValidationErr"
P2PNetworkConnectionErr TypeBlocker = "P2PNetworkConnectionErr"
)

func NewBlocker(typeBlocker TypeBlocker, message string) error {
Expand Down
6 changes: 3 additions & 3 deletions common/constant/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ const (
// PriorityStrategyMaxStayedInUnresolvedPeers max time a peer can stay before being cycled out from unresolved peers
PriorityStrategyMaxStayedInUnresolvedPeers int64 = 120
// BlockchainsyncWaitingTime time, in seconds, to wait before start syncing the blockchain
BlockchainsyncWaitingTime time.Duration = 3 * time.Second
// BlockchainsyncSpineCheckInterval time, in seconds, between checks if spine blocks have finished to be downloaded
BlockchainsyncSpineCheckInterval time.Duration = 3 * time.Second
BlockchainsyncWaitingTime time.Duration = 5 * time.Second
// BlockchainsyncCheckInterval time, in seconds, between checks if spine blocks have finished to be downloaded
BlockchainsyncCheckInterval time.Duration = 10 * time.Second
// BlockchainsyncSpineTimeout timeout, in seconds, for spine blocks to be downloaded from the network
// FIXME: this is for debugging purposes only and must higher on production,
// where downloading the spine blocks could take longer than 30 minutes
Expand Down
80 changes: 38 additions & 42 deletions core/blockchainsync/blockchainSync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ package blockchainsync

import (
"fmt"
"github.com/zoobc/zoobc-core/common/blocker"
"time"

log "github.com/sirupsen/logrus"
"github.com/zoobc/zoobc-core/common/chaintype"
"github.com/zoobc/zoobc-core/common/constant"
"github.com/zoobc/zoobc-core/common/kvdb"
"github.com/zoobc/zoobc-core/common/model"
"github.com/zoobc/zoobc-core/common/query"
"github.com/zoobc/zoobc-core/common/transaction"
"github.com/zoobc/zoobc-core/core/service"
"github.com/zoobc/zoobc-core/p2p/client"
Expand All @@ -19,53 +18,35 @@ import (
// TODO: rename into something more specific, such as SyncService
type Service struct {
// isScanningBlockchain bool
ChainType chaintype.ChainType
PeerServiceClient client.PeerServiceClientInterface
PeerExplorer strategy.PeerExplorerStrategyInterface
BlockService service.BlockServiceInterface
BlockchainDownloader BlockchainDownloadInterface
ForkingProcessor ForkingProcessorInterface
Logger *log.Logger
TransactionUtil transaction.UtilInterface
ChainType chaintype.ChainType
PeerServiceClient client.PeerServiceClientInterface
PeerExplorer strategy.PeerExplorerStrategyInterface
BlockService service.BlockServiceInterface
BlockchainDownloader BlockchainDownloadInterface
ForkingProcessor ForkingProcessorInterface
Logger *log.Logger
TransactionUtil transaction.UtilInterface
BlockTypeStatusService service.BlockTypeStatusServiceInterface
}

func NewBlockchainSyncService(
blockService service.BlockServiceInterface,
peerServiceClient client.PeerServiceClientInterface,
peerExplorer strategy.PeerExplorerStrategyInterface,
queryExecutor query.ExecutorInterface,
mempoolService service.MempoolServiceInterface,
txActionSwitcher transaction.TypeActionSwitcher,
logger *log.Logger,
kvdb kvdb.KVExecutorInterface,
transactionUtil transaction.UtilInterface,
transactionCoreService service.TransactionCoreServiceInterface,
blockTypeStatusService service.BlockTypeStatusServiceInterface,
blockchainDownloader BlockchainDownloadInterface,
forkingProcessor ForkingProcessorInterface,
) *Service {
return &Service{
ChainType: blockService.GetChainType(),
BlockService: blockService,
PeerServiceClient: peerServiceClient,
PeerExplorer: peerExplorer,
BlockchainDownloader: &BlockchainDownloader{
ChainType: blockService.GetChainType(),
BlockService: blockService,
PeerServiceClient: peerServiceClient,
PeerExplorer: peerExplorer,
Logger: logger,
},
ForkingProcessor: &ForkingProcessor{
ChainType: blockService.GetChainType(),
BlockService: blockService,
QueryExecutor: queryExecutor,
ActionTypeSwitcher: txActionSwitcher,
MempoolService: mempoolService,
KVExecutor: kvdb,
PeerExplorer: peerExplorer,
Logger: logger,
TransactionUtil: transactionUtil,
TransactionCorService: transactionCoreService,
},
Logger: logger,
ChainType: blockService.GetChainType(),
BlockService: blockService,
PeerServiceClient: peerServiceClient,
PeerExplorer: peerExplorer,
BlockchainDownloader: blockchainDownloader,
ForkingProcessor: forkingProcessor,
Logger: logger,
BlockTypeStatusService: blockTypeStatusService,
}
}

Expand Down Expand Up @@ -120,8 +101,23 @@ func (bss *Service) getMoreBlocks() {
needDownloadBlock := true
peerBlockchainInfo, err = bss.BlockchainDownloader.GetPeerBlockchainInfo()
if err != nil {
bss.Logger.Infof("\nfailed to getPeerBlockchainInfo: %v\n\n", err)
needDownloadBlock = false
errCasted := err.(blocker.Blocker)
switch errCasted.Type {
case blocker.P2PNetworkConnectionErr:
// this will allow the node to start smithing if it fails to connect to the p2p network,
// eg. he is the first node. if later on he can connect, it will try resolve the fork normally
bss.BlockTypeStatusService.SetFirstDownloadFinished(bss.ChainType, true)
bss.Logger.Info(errCasted.Message)
case blocker.ChainValidationErr:
bss.Logger.Infof("peer %s:%d: %s",
peerBlockchainInfo.Peer.GetInfo().Address,
peerBlockchainInfo.Peer.GetInfo().Port,
errCasted.Message)
default:
bss.Logger.Infof("failed to getPeerBlockchainInfo: %v", err)

}
}

newLastBlock = nil
Expand Down Expand Up @@ -177,7 +173,7 @@ func (bss *Service) getMoreBlocks() {
}

if bss.BlockchainDownloader.IsDownloadFinish(lastBlock) {
bss.BlockchainDownloader.SetIsDownloading(false)
bss.BlockTypeStatusService.SetIsDownloading(bss.ChainType, false)
bss.Logger.Infof("Finished %s blockchain download: %d blocks pulled", bss.ChainType.GetName(), lastBlock.Height-initialHeight)
break
}
Expand Down
76 changes: 53 additions & 23 deletions core/blockchainsync/downloadBlockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,19 @@ import (

type (
BlockchainDownloadInterface interface {
SetIsDownloading(newValue bool)
IsDownloadFinish(currentLastBlock *model.Block) bool
GetPeerBlockchainInfo() (*PeerBlockchainInfo, error)
DownloadFromPeer(feederPeer *model.Peer, chainBlockIds []int64, commonBlock *model.Block) (*PeerForkInfo, error)
ConfirmWithPeer(peerToCheck *model.Peer, commonMilestoneBlockID int64) ([]int64, error)
}
BlockchainDownloader struct {
IsDownloading bool // only for status
PeerHasMore bool
ChainType chaintype.ChainType
BlockService service.BlockServiceInterface
PeerServiceClient client.PeerServiceClientInterface
PeerExplorer strategy.PeerExplorerStrategyInterface
Logger *log.Logger
PeerHasMore bool
ChainType chaintype.ChainType
BlockService service.BlockServiceInterface
PeerServiceClient client.PeerServiceClientInterface
PeerExplorer strategy.PeerExplorerStrategyInterface
Logger *log.Logger
BlockTypeStatusService service.BlockTypeStatusServiceInterface
}

PeerBlockchainInfo struct {
Expand All @@ -49,6 +48,23 @@ type (
}
)

func NewBlockchainDownloader(
blockService service.BlockServiceInterface,
peerServiceClient client.PeerServiceClientInterface,
peerExplorer strategy.PeerExplorerStrategyInterface,
logger *log.Logger,
blockTypeStatusService service.BlockTypeStatusServiceInterface,
) *BlockchainDownloader {
return &BlockchainDownloader{
ChainType: blockService.GetChainType(),
BlockService: blockService,
PeerServiceClient: peerServiceClient,
PeerExplorer: peerExplorer,
Logger: logger,
BlockTypeStatusService: blockTypeStatusService,
}
}

func (bd *BlockchainDownloader) IsDownloadFinish(currentLastBlock *model.Block) bool {
currentHeight := currentLastBlock.Height
currentCumulativeDifficulty := currentLastBlock.CumulativeDifficulty
Expand All @@ -60,15 +76,13 @@ func (bd *BlockchainDownloader) IsDownloadFinish(currentLastBlock *model.Block)
heightAfterDownload := afterDownloadLastBlock.Height
cumulativeDifficultyAfterDownload := afterDownloadLastBlock.CumulativeDifficulty
if currentHeight > 0 && currentHeight == heightAfterDownload && currentCumulativeDifficulty == cumulativeDifficultyAfterDownload {
// we only initialize this flag (to false) in main, so once is set to true, it will always be true
bd.BlockTypeStatusService.SetFirstDownloadFinished(bd.ChainType, true)
return true
}
return false
}

func (bd *BlockchainDownloader) SetIsDownloading(newValue bool) {
bd.IsDownloading = newValue
}

func (bd *BlockchainDownloader) GetPeerBlockchainInfo() (*PeerBlockchainInfo, error) {
var (
err error
Expand All @@ -79,28 +93,35 @@ func (bd *BlockchainDownloader) GetPeerBlockchainInfo() (*PeerBlockchainInfo, er
bd.PeerHasMore = true
peer := bd.PeerExplorer.GetAnyResolvedPeer()
if peer == nil {
return nil, errors.New("no connected peer can be found")
return nil, blocker.NewBlocker(blocker.P2PNetworkConnectionErr, "no connected peer can be found")
}
peerCumulativeDifficultyResponse, err = bd.PeerServiceClient.GetCumulativeDifficulty(peer, bd.ChainType)
if err != nil {
return nil, fmt.Errorf("failed to get Cumulative Difficulty of peer %v: %v", peer.Info.Address, err)
return &PeerBlockchainInfo{
Peer: peer,
CommonBlock: commonBlock,
}, blocker.NewBlocker(blocker.ChainValidationErr,
fmt.Sprintf("failed to get Cumulative Difficulty of peer %v: %v", peer.Info.Address, err))
}

peerCumulativeDifficulty, _ := new(big.Int).SetString(peerCumulativeDifficultyResponse.CumulativeDifficulty, 10)
peerHeight := peerCumulativeDifficultyResponse.Height

lastBlock, err = bd.BlockService.GetLastBlock()
if err != nil {
return nil, err
return nil, blocker.NewBlocker(blocker.DBErr, err.Error())
}
lastBlockCumulativeDifficulty, _ := new(big.Int).SetString(lastBlock.CumulativeDifficulty, 10)
lastBlockHeight := lastBlock.Height
lastBlockID := lastBlock.ID

if peerCumulativeDifficulty == nil || lastBlockCumulativeDifficulty == nil ||
peerCumulativeDifficulty.Cmp(lastBlockCumulativeDifficulty) <= 0 {
return nil, fmt.Errorf("peer's cumulative difficulty %s:%v is lower/same with the current node's",
peer.GetInfo().Address, peer.GetInfo().Port)
return &PeerBlockchainInfo{
Peer: peer,
CommonBlock: commonBlock,
}, blocker.NewBlocker(blocker.ChainValidationErr,
"cumulative difficulty is lower/same with the current node's")
}

commonMilestoneBlockID := bd.ChainType.GetGenesisBlockID()
Expand All @@ -113,22 +134,31 @@ func (bd *BlockchainDownloader) GetPeerBlockchainInfo() (*PeerBlockchainInfo, er

chainBlockIds := bd.getBlockIdsAfterCommon(peer, commonMilestoneBlockID)
if len(chainBlockIds) < 2 || !bd.PeerHasMore {
return nil, errors.New("the peer does not have more updated chain")
return &PeerBlockchainInfo{
Peer: peer,
CommonBlock: commonBlock,
}, blocker.NewBlocker(blocker.ChainValidationErr, "the peer does not have more updated chain")
}

commonBlockID := chainBlockIds[0]
commonBlock, err = bd.BlockService.GetBlockByID(commonBlockID, false)
if err != nil {
bd.Logger.Infof("common block %v not found, milestone block id: %v", commonBlockID, commonMilestoneBlockID)
return nil, err
return &PeerBlockchainInfo{
Peer: peer,
CommonBlock: commonBlock,
}, blocker.NewBlocker(blocker.ChainValidationErr, fmt.Sprintf("common block %v not found, milestone block id: %v",
commonBlockID, commonMilestoneBlockID))
}
if commonBlock == nil || lastBlockHeight-commonBlock.GetHeight() >= constant.MinRollbackBlocks {
return nil, errors.New("invalid common block")
return &PeerBlockchainInfo{
Peer: peer,
CommonBlock: commonBlock,
}, blocker.NewBlocker(blocker.ChainValidationErr, "invalid common block")
}

if !bd.IsDownloading && peerHeight-commonBlock.GetHeight() > 10 {
if !bd.BlockTypeStatusService.IsDownloading(bd.ChainType) && peerHeight-commonBlock.GetHeight() > 10 {
bd.Logger.Info("Blockchain download in progress")
bd.IsDownloading = true
bd.BlockTypeStatusService.SetIsDownloading(bd.ChainType, true)
}

return &PeerBlockchainInfo{
Expand Down
Loading