Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions chainntnfs/bitcoindnotify/bitcoind.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,20 @@ func (b *BitcoindNotifier) startNotifier() error {
if err != nil {
return err
}
blockHeader, err := b.chainConn.GetBlockHeader(currentHash)
if err != nil {
return err
}

b.txNotifier = chainntnfs.NewTxNotifier(
uint32(currentHeight), chainntnfs.ReorgSafetyLimit,
b.confirmHintCache, b.spendHintCache,
)

b.bestBlock = chainntnfs.BlockEpoch{
Height: currentHeight,
Hash: currentHash,
Height: currentHeight,
Hash: currentHash,
BlockHeader: blockHeader,
}

b.wg.Add(1)
Expand Down Expand Up @@ -322,6 +327,7 @@ out:
b.notifyBlockEpochClient(
msg, b.bestBlock.Height,
b.bestBlock.Hash,
b.bestBlock.BlockHeader,
)

msg.errorChan <- nil
Expand All @@ -343,6 +349,7 @@ out:
for _, block := range missedBlocks {
b.notifyBlockEpochClient(
msg, block.Height, block.Hash,
block.BlockHeader,
)
}

Expand Down Expand Up @@ -392,8 +399,9 @@ out:
}

newBlock := chainntnfs.BlockEpoch{
Height: item.Height,
Hash: &item.Hash,
Height: item.Height,
Hash: &item.Hash,
BlockHeader: blockHeader,
}
if err := b.handleBlockConnected(newBlock); err != nil {
chainntnfs.Log.Error(err)
Expand Down Expand Up @@ -589,26 +597,29 @@ func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) err
// satisfy any client requests based upon the new block.
b.bestBlock = block

b.notifyBlockEpochs(block.Height, block.Hash)
b.notifyBlockEpochs(block.Height, block.Hash, block.BlockHeader)
return b.txNotifier.NotifyHeight(uint32(block.Height))
}

// notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain.
func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) {
func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash,
blockHeader *wire.BlockHeader) {

for _, client := range b.blockEpochClients {
b.notifyBlockEpochClient(client, newHeight, newSha)
b.notifyBlockEpochClient(client, newHeight, newSha, blockHeader)
}
}

// notifyBlockEpochClient sends a registered block epoch client a notification
// about a specific block.
func (b *BitcoindNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration,
height int32, sha *chainhash.Hash) {
height int32, sha *chainhash.Hash, header *wire.BlockHeader) {

epoch := &chainntnfs.BlockEpoch{
Height: height,
Hash: sha,
Height: height,
Hash: sha,
BlockHeader: header,
}

select {
Expand Down
44 changes: 32 additions & 12 deletions chainntnfs/btcdnotify/btcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,22 @@ func (b *BtcdNotifier) startNotifier() error {
return err
}

bestBlock, err := b.chainConn.GetBlock(currentHash)
if err != nil {
b.txUpdates.Stop()
b.chainUpdates.Stop()
return err
}

b.txNotifier = chainntnfs.NewTxNotifier(
uint32(currentHeight), chainntnfs.ReorgSafetyLimit,
b.confirmHintCache, b.spendHintCache,
)

b.bestBlock = chainntnfs.BlockEpoch{
Height: currentHeight,
Hash: currentHash,
Height: currentHeight,
Hash: currentHash,
BlockHeader: &bestBlock.Header,
}

if err := b.chainConn.NotifyBlocks(); err != nil {
Expand Down Expand Up @@ -375,6 +383,7 @@ out:
b.notifyBlockEpochClient(
msg, b.bestBlock.Height,
b.bestBlock.Hash,
b.bestBlock.BlockHeader,
)

msg.errorChan <- nil
Expand All @@ -396,6 +405,7 @@ out:
for _, block := range missedBlocks {
b.notifyBlockEpochClient(
msg, block.Height, block.Hash,
block.BlockHeader,
)
}

Expand All @@ -405,8 +415,9 @@ out:
case item := <-b.chainUpdates.ChanOut():
update := item.(*chainUpdate)
if update.connect {
blockHeader, err :=
b.chainConn.GetBlockHeader(update.blockHash)
blockHeader, err := b.chainConn.GetBlockHeader(
update.blockHash,
)
if err != nil {
chainntnfs.Log.Errorf("Unable to fetch "+
"block header: %v", err)
Expand Down Expand Up @@ -445,8 +456,9 @@ out:
}

newBlock := chainntnfs.BlockEpoch{
Height: update.blockHeight,
Hash: update.blockHash,
Height: update.blockHeight,
Hash: update.blockHash,
BlockHeader: blockHeader,
}
if err := b.handleBlockConnected(newBlock); err != nil {
chainntnfs.Log.Error(err)
Expand Down Expand Up @@ -654,26 +666,34 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
// satisfy any client requests based upon the new block.
b.bestBlock = epoch

b.notifyBlockEpochs(epoch.Height, epoch.Hash)
b.notifyBlockEpochs(
epoch.Height, epoch.Hash, epoch.BlockHeader,
)

return b.txNotifier.NotifyHeight(uint32(epoch.Height))
}

// notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain.
func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) {
func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32,
newSha *chainhash.Hash, blockHeader *wire.BlockHeader) {

for _, client := range b.blockEpochClients {
b.notifyBlockEpochClient(client, newHeight, newSha)
b.notifyBlockEpochClient(
client, newHeight, newSha, blockHeader,
)
}
}

// notifyBlockEpochClient sends a registered block epoch client a notification
// about a specific block.
func (b *BtcdNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration,
height int32, sha *chainhash.Hash) {
height int32, sha *chainhash.Hash, blockHeader *wire.BlockHeader) {

epoch := &chainntnfs.BlockEpoch{
Height: height,
Hash: sha,
Height: height,
Hash: sha,
BlockHeader: blockHeader,
}

select {
Expand Down
41 changes: 33 additions & 8 deletions chainntnfs/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ type BlockEpoch struct {
// Height is the height of the latest block to be added to the tip of
// the main chain.
Height int32

// BlockHeader is the block header of this new height.
BlockHeader *wire.BlockHeader
}

// BlockEpochEvent encapsulates an on-going stream of block epoch
Expand Down Expand Up @@ -489,8 +492,9 @@ func RewindChain(chainConn ChainConn, txNotifier *TxNotifier,
currBestBlock BlockEpoch, targetHeight int32) (BlockEpoch, error) {

newBestBlock := BlockEpoch{
Height: currBestBlock.Height,
Hash: currBestBlock.Hash,
Height: currBestBlock.Height,
Hash: currBestBlock.Hash,
BlockHeader: currBestBlock.BlockHeader,
}

for height := currBestBlock.Height; height > targetHeight; height-- {
Expand All @@ -500,6 +504,11 @@ func RewindChain(chainConn ChainConn, txNotifier *TxNotifier,
"find blockhash for disconnected height=%d: %v",
height, err)
}
header, err := chainConn.GetBlockHeader(hash)
if err != nil {
return newBestBlock, fmt.Errorf("unable to get block "+
"header for height=%v", height-1)
}

Log.Infof("Block disconnected from main chain: "+
"height=%v, sha=%v", height, newBestBlock.Hash)
Expand All @@ -512,7 +521,9 @@ func RewindChain(chainConn ChainConn, txNotifier *TxNotifier,
}
newBestBlock.Height = height - 1
newBestBlock.Hash = hash
newBestBlock.BlockHeader = header
}

return newBestBlock, nil
}

Expand All @@ -536,8 +547,9 @@ func HandleMissedBlocks(chainConn ChainConn, txNotifier *TxNotifier,
// If a reorg causes our best hash to be incorrect, rewind the
// chain so our best block is set to the closest common
// ancestor, then dispatch notifications from there.
hashAtBestHeight, err :=
chainConn.GetBlockHash(int64(currBestBlock.Height))
hashAtBestHeight, err := chainConn.GetBlockHash(
int64(currBestBlock.Height),
)
if err != nil {
return currBestBlock, nil, fmt.Errorf("unable to find "+
"blockhash for height=%d: %v",
Expand All @@ -552,8 +564,9 @@ func HandleMissedBlocks(chainConn ChainConn, txNotifier *TxNotifier,
"common ancestor: %v", err)
}

currBestBlock, err = RewindChain(chainConn, txNotifier,
currBestBlock, startingHeight)
currBestBlock, err = RewindChain(
chainConn, txNotifier, currBestBlock, startingHeight,
)
if err != nil {
return currBestBlock, nil, fmt.Errorf("unable to "+
"rewind chain: %v", err)
Expand Down Expand Up @@ -589,8 +602,20 @@ func getMissedBlocks(chainConn ChainConn, startingHeight,
return nil, fmt.Errorf("unable to find blockhash for "+
"height=%d: %v", height, err)
}
missedBlocks = append(missedBlocks,
BlockEpoch{Hash: hash, Height: height})
header, err := chainConn.GetBlockHeader(hash)
if err != nil {
return nil, fmt.Errorf("unable to find block header "+
"for height=%d: %v", height, err)
}

missedBlocks = append(
missedBlocks,
BlockEpoch{
Hash: hash,
Height: height,
BlockHeader: header,
},
)
}

return missedBlocks, nil
Expand Down
33 changes: 27 additions & 6 deletions chainntnfs/neutrinonotify/neutrino.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,18 @@ func (n *NeutrinoNotifier) startNotifier() error {
n.chainUpdates.Stop()
return err
}
startingHeader, err := n.p2pNode.GetBlockHeader(
&startingPoint.Hash,
)
if err != nil {
n.txUpdates.Stop()
n.chainUpdates.Stop()
return err
}

n.bestBlock.Hash = &startingPoint.Hash
n.bestBlock.Height = startingPoint.Height
n.bestBlock.BlockHeader = startingHeader

n.txNotifier = chainntnfs.NewTxNotifier(
uint32(n.bestBlock.Height), chainntnfs.ReorgSafetyLimit,
Expand Down Expand Up @@ -226,6 +236,7 @@ func (n *NeutrinoNotifier) startNotifier() error {
// includes a transaction that confirmed one of our watched txids, or spends
// one of the outputs currently being watched.
type filteredBlock struct {
header *wire.BlockHeader
hash chainhash.Hash
height uint32
txns []*btcutil.Tx
Expand Down Expand Up @@ -255,6 +266,7 @@ func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32,
hash: header.BlockHash(),
height: uint32(height),
txns: txns,
header: header,
connect: true,
}:
case <-n.quit:
Expand Down Expand Up @@ -374,6 +386,7 @@ out:
n.notifyBlockEpochClient(
msg, n.bestBlock.Height,
n.bestBlock.Hash,
n.bestBlock.BlockHeader,
)

msg.errorChan <- nil
Expand All @@ -399,6 +412,7 @@ out:
for _, block := range missedBlocks {
n.notifyBlockEpochClient(
msg, block.Height, block.Hash,
block.BlockHeader,
)
}

Expand Down Expand Up @@ -629,8 +643,11 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
// satisfy any client requests based upon the new block.
n.bestBlock.Hash = &newBlock.hash
n.bestBlock.Height = int32(newBlock.height)
n.bestBlock.BlockHeader = newBlock.header

n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
n.notifyBlockEpochs(
int32(newBlock.height), &newBlock.hash, newBlock.header,
)
return n.txNotifier.NotifyHeight(newBlock.height)
}

Expand All @@ -646,6 +663,7 @@ func (n *NeutrinoNotifier) getFilteredBlock(epoch chainntnfs.BlockEpoch) (*filte
block := &filteredBlock{
hash: *epoch.Hash,
height: uint32(epoch.Height),
header: &rawBlock.MsgBlock().Header,
txns: txns,
connect: true,
}
Expand All @@ -654,20 +672,23 @@ func (n *NeutrinoNotifier) getFilteredBlock(epoch chainntnfs.BlockEpoch) (*filte

// notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain.
func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) {
func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash,
blockHeader *wire.BlockHeader) {

for _, client := range n.blockEpochClients {
n.notifyBlockEpochClient(client, newHeight, newSha)
n.notifyBlockEpochClient(client, newHeight, newSha, blockHeader)
}
}

// notifyBlockEpochClient sends a registered block epoch client a notification
// about a specific block.
func (n *NeutrinoNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration,
height int32, sha *chainhash.Hash) {
height int32, sha *chainhash.Hash, blockHeader *wire.BlockHeader) {

epoch := &chainntnfs.BlockEpoch{
Height: height,
Hash: sha,
Height: height,
Hash: sha,
BlockHeader: blockHeader,
}

select {
Expand Down
Loading