Skip to content

Portal: Implementation of finalized history network WIP #3427

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions portal/network/finalized_history/content/content_keys.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Nimbus
# Copyright (c) 2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

{.push raises: [].}

import
nimcrypto/[sha2, hash],
results,
stint,
ssz_serialization,
../../../common/common_types

export ssz_serialization, common_types, results#, hash

type
ContentType* = enum
# Note: Need to add this unused value as a case object with an enum without
# a 0 valueis not allowed: "low(contentType) must be 0 for discriminant".
# For prefix values that are in the enum gap, the deserialization will fail
# at runtime as is wanted.
# In the future it might be possible that this will fail at compile time for
# the SSZ Union type, but currently it is allowed in the implementation, and
# the SSZ spec is not explicit about disallowing this.
unused = 0x00
blockBody = 0x09
receipts = 0x0A

BlockNumberKey* = object
blockNumber*: uint64

ContentKey* = object
case contentType*: ContentType
of unused:
discard
of blockBody:
blockBodyKey*: BlockNumberKey
of receipts:
receiptsKey*: BlockNumberKey

func blockBodyContentKey*(blockNumber: uint64): ContentKey =
ContentKey(contentType: blockBody, blockBodyKey: BlockNumberKey(blockNumber: blockNumber))

func receiptsContentKey*(blockNumber: uint64): ContentKey =
ContentKey(contentType: receipts, receiptsKey: BlockNumberKey(blockNumber: blockNumber))

proc readSszBytes*(data: openArray[byte], val: var ContentKey) {.raises: [SszError].} =
mixin readSszValue
if data.len() > 0 and data[0] == ord(unused):
raise newException(MalformedSszError, "SSZ selector is unused value")

readSszValue(data, val)

func encode*(contentKey: ContentKey): ContentKeyByteList =
doAssert(contentKey.contentType != unused)
ContentKeyByteList.init(SSZ.encode(contentKey))

func decode*(contentKey: ContentKeyByteList): Opt[ContentKey] =
try:
Opt.some(SSZ.decode(contentKey.asSeq(), ContentKey))
except SerializationError:
return Opt.none(ContentKey)

# TODO: change to correct content id derivation
func toContentId*(contentKey: ContentKeyByteList): ContentId =
# TODO: Should we try to parse the content key here for invalid ones?
let idHash = sha2.sha256.digest(contentKey.asSeq())
readUintBE[256](idHash.data)

func toContentId*(contentKey: ContentKey): ContentId =
toContentId(encode(contentKey))

func `$`*(x: BlockNumberKey): string =
"block_number: " & $x.blockNumber

func `$`*(x: ContentKey): string =
var res = "(type: " & $x.contentType & ", "

case x.contentType
of unused:
raiseAssert "ContentKey may not have unused value as content type"
of blockBody:
res.add($x.blockBodyKey)
of receipts:
res.add($x.receiptsKey)

res.add(")")

res
14 changes: 14 additions & 0 deletions portal/network/finalized_history/content/content_values.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Nimbus
# Copyright (c) 2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

{.push raises: [].}

import eth/common/blocks_rlp, eth/common/receipts_rlp

export blocks_rlp, receipts_rlp

type Receipts* = seq[Receipt]
12 changes: 12 additions & 0 deletions portal/network/finalized_history/finalized_history_content.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Nimbus
# Copyright (c) 2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

{.push raises: [].}

import ./content/content_keys, ./content/content_values

export content_keys, content_values
204 changes: 204 additions & 0 deletions portal/network/finalized_history/finalized_history_network.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
# Nimbus
# Copyright (c) 2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

{.push raises: [].}

import
results,
chronos,
chronicles,
metrics,
eth/common/headers,
eth/p2p/discoveryv5/[protocol, enr],
../../common/common_types,
../../database/content_db,
# ../network_metadata,
../wire/[portal_protocol, portal_stream, portal_protocol_config, ping_extensions],
"."/[finalized_history_content, finalized_history_validation]

from eth/common/accounts import EMPTY_ROOT_HASH

logScope:
topics = "portal_fin_hist"

const pingExtensionCapabilities = {CapabilitiesType, HistoryRadiusType}

type
FinalizedHistoryNetwork* = ref object
portalProtocol*: PortalProtocol
contentDB*: ContentDB
contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])]
# cfg*: RuntimeConfig
processContentLoops: seq[Future[void]]
statusLogLoop: Future[void]
contentRequestRetries: int
contentQueueWorkers: int

func toContentIdHandler(contentKey: ContentKeyByteList): results.Opt[ContentId] =
ok(toContentId(contentKey))

proc new*(
T: type FinalizedHistoryNetwork,
portalNetwork: PortalNetwork,
baseProtocol: protocol.Protocol,
contentDB: ContentDB,
streamManager: StreamManager,
# cfg: RuntimeConfig,
bootstrapRecords: openArray[Record] = [],
portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig,
contentRequestRetries = 1,
contentQueueWorkers = 50,
contentQueueSize = 50,
): T =
let
contentQueue =
newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](contentQueueSize)

stream = streamManager.registerNewStream(contentQueue)

portalProtocol = PortalProtocol.new(
baseProtocol,
[byte(0x50), 0x00], # TODO: Adapt getProtocolId
toContentIdHandler,
createGetHandler(contentDB),
createStoreHandler(contentDB, portalConfig.radiusConfig),
createContainsHandler(contentDB),
createRadiusHandler(contentDB),
stream,
bootstrapRecords,
config = portalConfig,
pingExtensionCapabilities = pingExtensionCapabilities,
)

FinalizedHistoryNetwork(
portalProtocol: portalProtocol,
contentDB: contentDB,
contentQueue: contentQueue,
# cfg: cfg,
contentRequestRetries: contentRequestRetries,
contentQueueWorkers: contentQueueWorkers,
)

proc validateContent(
n: FinalizedHistoryNetwork, content: seq[byte], contentKeyBytes: ContentKeyByteList
): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
# TODO: specs might turn out to just disable offers. Although I think for for getting initial data in the network
# this might be an issue. Unless history expiry gets deployed together with Portal.
let contentKey = finalized_history_content.decode(contentKeyBytes).valueOr:
return err("Error decoding content key")

case contentKey.contentType
of unused:
raiseAssert("ContentKey contentType: unused")
of blockBody:
let
# TODO: Need to get the header (or just tx root/uncle root/withdrawals root) from the EL client via
# JSON-RPC.
# OR if directly integrated the EL client, we can just pass the header here.
header = Header()
blockBody = decodeRlp(content, BlockBody).valueOr:
return err("Error decoding block body: " & error)
validateBlockBody(blockBody, header).isOkOr:
return err("Failed validating block body: " & error)

ok()
of receipts:
let
# TODO: Need to get the header (or just tx root/uncle root/withdrawals root) from the EL client via
# JSON-RPC.
# OR if directly integrated the EL client, we can just pass the header here.
header = Header()
receipts = decodeRlp(content, seq[Receipt]).valueOr:
return err("Error decoding receipts: " & error)
validateReceipts(receipts, header.receiptsRoot).isOkOr:
return err("Failed validating receipts: " & error)

ok()

proc validateContent(
n: FinalizedHistoryNetwork,
srcNodeId: Opt[NodeId],
contentKeys: ContentKeysList,
contentItems: seq[seq[byte]],
): Future[bool] {.async: (raises: [CancelledError]).} =
# content passed here can have less items then contentKeys, but not more.
for i, contentItem in contentItems:
let contentKey = contentKeys[i]
let res = await n.validateContent(contentItem, contentKey)
if res.isOk():
let contentId = n.portalProtocol.toContentId(contentKey).valueOr:
warn "Received offered content with invalid content key", srcNodeId, contentKey
return false

n.portalProtocol.storeContent(
contentKey, contentId, contentItem, cacheOffer = true
)

debug "Received offered content validated successfully", srcNodeId, contentKey
else:
if srcNodeId.isSome():
n.portalProtocol.banNode(srcNodeId.get(), NodeBanDurationOfferFailedValidation)

debug "Received offered content failed validation",
srcNodeId, contentKey, error = res.error
return false

return true

proc contentQueueWorker(n: FinalizedHistoryNetwork) {.async: (raises: []).} =
try:
while true:
let (srcNodeId, contentKeys, contentItems) = await n.contentQueue.popFirst()

if await n.validateContent(srcNodeId, contentKeys, contentItems):
portal_offer_validation_successful.inc(
labelValues = [$n.portalProtocol.protocolId]
)

discard await n.portalProtocol.neighborhoodGossip(
srcNodeId, contentKeys, contentItems
)
else:
portal_offer_validation_failed.inc(labelValues = [$n.portalProtocol.protocolId])
except CancelledError:
trace "contentQueueWorker canceled"

proc statusLogLoop(n: FinalizedHistoryNetwork) {.async: (raises: []).} =
try:
while true:
await sleepAsync(60.seconds)

info "History network status",
routingTableNodes = n.portalProtocol.routingTable.len()
except CancelledError:
trace "statusLogLoop canceled"

proc start*(n: FinalizedHistoryNetwork) =
info "Starting Portal finalized chain history network",
protocolId = n.portalProtocol.protocolId

n.portalProtocol.start()

for i in 0 ..< n.contentQueueWorkers:
n.processContentLoops.add(contentQueueWorker(n))

n.statusLogLoop = statusLogLoop(n)

proc stop*(n: FinalizedHistoryNetwork) {.async: (raises: []).} =
info "Stopping Portal finalized chain history network"

var futures: seq[Future[void]]
futures.add(n.portalProtocol.stop())

for loop in n.processContentLoops:
futures.add(loop.cancelAndWait())
if not n.statusLogLoop.isNil:
futures.add(n.statusLogLoop.cancelAndWait())
await noCancel(allFutures(futures))

n.processContentLoops.setLen(0)
n.statusLogLoop = nil
59 changes: 59 additions & 0 deletions portal/network/finalized_history/finalized_history_validation.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Nimbus
# Copyright (c) 2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

{.push raises: [].}

import
eth/trie/ordered_trie,
eth/common/[headers_rlp, blocks_rlp, receipts, hashes],
./finalized_history_content

func validateBlockBody*(
body: BlockBody, header: Header
): Result[void, string] =
## Validate the block body against the txRoot, ommersHash and withdrawalsRoot
## from the header.
## TODO: could add block number vs empty ommersHash + existing withdrawalsRoot check
let calculatedOmmersHash = keccak256(rlp.encode(body.uncles)) # TODO: avoid having to re-encode the uncles
if calculatedOmmersHash != header.ommersHash:
return err("Invalid ommers hash: expected " & $header.ommersHash & " - got " &
$calculatedOmmersHash)

let calculatedTxsRoot = orderedTrieRoot(body.transactions)
if calculatedTxsRoot != header.txRoot:
return err(
"Invalid transactions root: expected " & $header.txRoot & " - got " &
$calculatedTxsRoot
)

if header.withdrawalsRoot.isSome() and body.withdrawals.isNone() or
header.withdrawalsRoot.isNone() and body.withdrawals.isSome():
return err(
"Invalid withdrawals"
)

if header.withdrawalsRoot.isSome() and body.withdrawals.isSome():
let
calculatedWithdrawalsRoot = orderedTrieRoot(body.withdrawals.value())
headerWithdrawalsRoot = header.withdrawalsRoot.get()
if calculatedWithdrawalsRoot != headerWithdrawalsRoot:
return err(
"Invalid withdrawals root: expected " & $headerWithdrawalsRoot & " - got " &
$calculatedWithdrawalsRoot
)

ok()

func validateReceipts*(
receipts: Receipts, receiptsRoot: Hash32
): Result[void, string] =
let calculatedReceiptsRoot = orderedTrieRoot(receipts)
if calculatedReceiptsRoot != receiptsRoot:
err("Unexpected receipt root: expected " & $receiptsRoot &
" - got " & $calculatedReceiptsRoot)
else:
ok()
Loading