diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 277886cf37..9b876658d5 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -25,10 +25,10 @@ import blockchain_dag, blob_quarantine, block_quarantine, consensus_manager, attestation_pool, sync_committee_msg_pool, validator_change_pool, blockchain_list], - ./spec/datatypes/[base, altair], + ./spec/datatypes/[base, altair, fulu], ./spec/eth2_apis/dynamic_fee_recipients, ./spec/signatures_batch, - ./sync/[sync_manager, request_manager, sync_types], + ./sync/[sync_manager, request_manager, sync_types, validator_custody], ./validators/[ action_tracker, message_router, validator_monitor, validator_pool, keystore_management], @@ -96,6 +96,7 @@ type eventBus*: EventBus vcProcess*: Process requestManager*: RequestManager + validatorCustody*: ValidatorCustodyRef syncManager*: SyncManager[Peer, PeerId] backfiller*: SyncManager[Peer, PeerId] untrustedManager*: SyncManager[Peer, PeerId] diff --git a/beacon_chain/consensus_object_pools/blob_quarantine.nim b/beacon_chain/consensus_object_pools/blob_quarantine.nim index e1452a1390..74e886fe2a 100644 --- a/beacon_chain/consensus_object_pools/blob_quarantine.nim +++ b/beacon_chain/consensus_object_pools/blob_quarantine.nim @@ -76,9 +76,12 @@ type BlobQuarantine* = SidecarQuarantine[BlobSidecar, OnBlobSidecarCallback] + ColumnQuarantine* = SidecarQuarantine[DataColumnSidecar, OnDataColumnSidecarCallback] + ColumnQuarantineRef* = ref ColumnQuarantine + func isEmpty[A](holder: SidecarHolder[A]): bool = holder.kind == SidecarHolderKind.Empty diff --git a/beacon_chain/consensus_object_pools/block_pools_types.nim b/beacon_chain/consensus_object_pools/block_pools_types.nim index c48fee4c1d..432846facb 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types.nim @@ -131,6 +131,15 @@ type era*: EraDB + eaSlot*: Slot + ## Earliest available slot is the earliest slot at which the BN can + ## guarantee serving blocks with sidecars. + + erSlot*: Slot + ## Earliest refilled slot is the earliest slot at which excess + ## DataColumnSidecar downloading finishes, if erSlot = GENESIS_SLOT + ## we can deduce that validator custody is inactive. + validatorMonitor*: ref ValidatorMonitor forkBlocks*: HashSet[KeyedBlockRef] @@ -291,6 +300,7 @@ type BlockData* = object blck*: ForkedSignedBeaconBlock blob*: Opt[BlobSidecars] + dataColumn*: Opt[DataColumnSidecars] OnBlockAdded*[T: ForkyTrustedSignedBeaconBlock] = proc( blckRef: BlockRef, blck: T, epochRef: EpochRef, @@ -397,14 +407,7 @@ func horizon*(dag: ChainDAGRef): Slot = GENESIS_SLOT func earliestAvailableSlot*(dag: ChainDAGRef): Slot = - if dag.backfill.slot < dag.tail.slot and - dag.backfill.slot != GENESIS_SLOT: - # When the BN is backfilling, backfill slot is the earliest - # persisted block. - dag.backfill.slot - else: - # When the BN has backfilled, tail moves progressively. - dag.tail.slot + max(dag.eaSlot, dag.erSlot) template epoch*(e: EpochRef): Epoch = e.key.epoch diff --git a/beacon_chain/consensus_object_pools/block_quarantine.nim b/beacon_chain/consensus_object_pools/block_quarantine.nim index dca0140768..e740c93871 100644 --- a/beacon_chain/consensus_object_pools/block_quarantine.nim +++ b/beacon_chain/consensus_object_pools/block_quarantine.nim @@ -166,7 +166,7 @@ func cleanupUnviable(quarantine: var Quarantine) = break # Cannot modify while for-looping quarantine.unviable.del(toDel) -func removeUnviableOrphanTree( +proc removeUnviableOrphanTree( quarantine: var Quarantine, toCheck: var seq[Eth2Digest], tbl: var OrderedTable[(Eth2Digest, ValidatorSig), ForkedSignedBeaconBlock] @@ -192,13 +192,15 @@ func removeUnviableOrphanTree( for k in toRemove: tbl.del k + info "FOO9 in removeUnviableOrphans", + blockRoot = shortLog(k[0]) quarantine.unviable[k[0]] = () toRemove.setLen(0) checked -func removeUnviableSidecarlessTree( +proc removeUnviableSidecarlessTree( quarantine: var Quarantine, toCheck: var seq[Eth2Digest], tbl: var OrderedTable[Eth2Digest, ForkedSignedBeaconBlock]) = @@ -218,11 +220,16 @@ func removeUnviableSidecarlessTree( for k in toRemove: tbl.del k - quarantine.unviable[k] = () + info "FOOA in removeUnviableSidecarlessTree", + blockRoot = shortLog(k) + #quarantine.unviable[k] = () toRemove.setLen(0) -func addUnviable*(quarantine: var Quarantine, root: Eth2Digest) = +# TODO revert to func when addUnviable logging gone +proc addUnviable*(quarantine: var Quarantine, root: Eth2Digest) = + info "FOO8 in addUnviable", blockRoot = shortLog(root), st = getStackTrace() + # Unviable - don't try to download again! quarantine.missing.del(root) @@ -236,7 +243,8 @@ func addUnviable*(quarantine: var Quarantine, root: Eth2Digest) = quarantine.unviable[root] = () -func cleanupOrphans(quarantine: var Quarantine, finalizedSlot: Slot) = +# TODO revert to func when addUnviable logging gone +proc cleanupOrphans(quarantine: var Quarantine, finalizedSlot: Slot) = var toDel: seq[(Eth2Digest, ValidatorSig)] for k, v in quarantine.orphans: @@ -247,7 +255,8 @@ func cleanupOrphans(quarantine: var Quarantine, finalizedSlot: Slot) = quarantine.addUnviable k[0] quarantine.orphans.del k -func cleanupSidecarless(quarantine: var Quarantine, finalizedSlot: Slot) = +# TODO revert to func when addUnviable logging gone +proc cleanupSidecarless(quarantine: var Quarantine, finalizedSlot: Slot) = var toDel: seq[Eth2Digest] for k, v in quarantine.sidecarless: @@ -265,7 +274,8 @@ func clearAfterReorg*(quarantine: var Quarantine) = quarantine.missing.reset() quarantine.orphans.reset() -func pruneAfterFinalization*( +# TODO revert to func when addUnviable logging gone +proc pruneAfterFinalization*( quarantine: var Quarantine, epoch: Epoch, needsBackfill: bool @@ -276,14 +286,14 @@ func pruneAfterFinalization*( # Because Quarantine could be used as temporary storage for blocks which # do not have sidecars yet, we should not prune blocks which are behind # `MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS` epoch. Otherwise we will not - # be able to backfill this blocks properly. + # be able to backfill these blocks properly. if epoch < quarantine.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: Epoch(0) else: epoch - quarantine.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS else: epoch - slot = (startEpoch + 1).start_slot() + slot = startEpoch.start_slot() quarantine.cleanupSidecarless(slot) @@ -422,6 +432,14 @@ func popColumnless*( ): Opt[ForkedSignedBeaconBlock] {.deprecated.} = quarantine.popSidecarless(root) +func getColumnless*( + quarantine: var Quarantine, + root: Eth2Digest): Opt[ForkedSignedBeaconBlock] = + try: + Opt.some(quarantine.sidecarless[root]) + except KeyError: + Opt.none(ForkedSignedBeaconBlock) + func popBlobless*( quarantine: var Quarantine, root: Eth2Digest diff --git a/beacon_chain/consensus_object_pools/blockchain_list.nim b/beacon_chain/consensus_object_pools/blockchain_list.nim index b7677f8b0b..60c0fd699e 100644 --- a/beacon_chain/consensus_object_pools/blockchain_list.nim +++ b/beacon_chain/consensus_object_pools/blockchain_list.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018-2024 Status Research & Development GmbH +# Copyright (c) 2018-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). diff --git a/beacon_chain/el/el_manager.nim b/beacon_chain/el/el_manager.nim index 7857145f9d..d65070f9c5 100644 --- a/beacon_chain/el/el_manager.nim +++ b/beacon_chain/el/el_manager.nim @@ -772,7 +772,6 @@ proc sendGetBlobsV2*( return err() let deadline = sleepAsync(GETBLOBS_TIMEOUT) - var bestIdx: Opt[int] while true: diff --git a/beacon_chain/el/engine_api_conversions.nim b/beacon_chain/el/engine_api_conversions.nim index 996d0ddece..2a78a907f9 100644 --- a/beacon_chain/el/engine_api_conversions.nim +++ b/beacon_chain/el/engine_api_conversions.nim @@ -232,11 +232,11 @@ func asConsensusType*( # The `mapIt` calls below are necessary only because we use different distinct # types for KZG commitments and Blobs in the `web3` and the `deneb` spec types. # Both are defined as `array[N, byte]` under the hood. - blobsBundle: deneb.BlobsBundle( + blobsBundle: fulu.BlobsBundleV2( commitments: KzgCommitments.init( payload.blobsBundle.commitments.mapIt( kzg_abi.KzgCommitment(bytes: it.data))), - proofs: KzgProofs.init( + proofs: KzgProofsV2.init( payload.blobsBundle.proofs.mapIt( kzg_abi.KzgProof(bytes: it.data))), blobs: Blobs.init( diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index 620b371c03..46e18edae4 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -9,7 +9,9 @@ import chronicles, chronos, metrics, - ../spec/[forks, helpers_el, signatures, signatures_batch], + ../spec/[ + forks, helpers_el, signatures, signatures_batch, + peerdas_helpers], ../sszdump from std/deques import Deque, addLast, contains, initDeque, items, len, shrink @@ -26,13 +28,15 @@ from ../consensus_object_pools/block_dag import BlockRef, root, shortLog, slot from ../consensus_object_pools/block_pools_types import EpochRef, VerifierError from ../consensus_object_pools/block_quarantine import - addSidecarless, addOrphan, addUnviable, pop, removeOrphan + addBlobless, addSidecarless, addColumnless, addOrphan, addUnviable, + pop, removeOrphan from ../consensus_object_pools/blob_quarantine import - BlobQuarantine, popSidecars, put + BlobQuarantine, ColumnQuarantine, popSidecars, put from ../validators/validator_monitor import MsgSource, ValidatorMonitor, registerAttestationInBlock, registerBeaconBlock, registerSyncAggregateInBlock -from ../beacon_chain_db import getBlobSidecar, putBlobSidecar +from ../beacon_chain_db import getBlobSidecar, putBlobSidecar, + getDataColumnSidecar, putDataColumnSidecar from ../spec/state_transition_block import validate_blobs export sszdump, signatures_batch @@ -57,6 +61,7 @@ type BlockEntry = object blck*: ForkedSignedBeaconBlock blobs*: Opt[BlobSidecars] + columns*: Opt[DataColumnSidecars] maybeFinalized*: bool ## The block source claims the block has been finalized already resfut*: Future[Result[void, VerifierError]].Raising([CancelledError]) @@ -102,6 +107,7 @@ type getBeaconTime: GetBeaconTimeFn blobQuarantine: ref BlobQuarantine + dataColumnQuarantine*: ref ColumnQuarantine verifier: BatchVerifier lastPayload: Slot @@ -128,12 +134,12 @@ proc new*(T: type BlockProcessor, consensusManager: ref ConsensusManager, validatorMonitor: ref ValidatorMonitor, blobQuarantine: ref BlobQuarantine, + dataColumnQuarantine: ref ColumnQuarantine, getBeaconTime: GetBeaconTimeFn, invalidBlockRoots: seq[Eth2Digest] = @[]): ref BlockProcessor = if invalidBlockRoots.len > 0: warn "Config requests blocks to be treated as invalid", debugInvalidateBlockRoot = invalidBlockRoots - (ref BlockProcessor)( dumpEnabled: dumpEnabled, dumpDirInvalid: dumpDirInvalid, @@ -143,6 +149,7 @@ proc new*(T: type BlockProcessor, consensusManager: consensusManager, validatorMonitor: validatorMonitor, blobQuarantine: blobQuarantine, + dataColumnQuarantine: dataColumnQuarantine, getBeaconTime: getBeaconTime, verifier: batchVerifier[] ) @@ -180,15 +187,59 @@ from ../consensus_object_pools/block_clearance import proc storeBackfillBlock( self: var BlockProcessor, signedBlock: ForkySignedBeaconBlock, - blobsOpt: Opt[BlobSidecars]): Result[void, VerifierError] = + blobsOpt: Opt[BlobSidecars], + dataColumnsOpt: Opt[DataColumnSidecars]): + Result[void, VerifierError] = # The block is certainly not missing any more self.consensusManager.quarantine[].missing.del(signedBlock.root) + var columnsOk = true + when typeof(signedBlock).kind >= ConsensusFork.Fulu: + var malformed_cols: seq[int] + if dataColumnsOpt.isSome: + let columns = dataColumnsOpt.get() + let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq + if columns.len > 0 and kzgCommits.len > 0: + for i in 0.. + (self.consensusManager.dag.cfg.NUMBER_OF_COLUMNS div 2) and + not columnsOk: + let + recovered_cps = + recover_cells_and_proofs(columns) + recovered_columns = + signedBlock.get_data_column_sidecars(recovered_cps.get) + + for mc in malformed_cols: + # copy the healed columns only into the + # sidecar spaces + columns[mc][] = recovered_columns[mc] + columnsOk = true + + if not columnsOk: + return err(VerifierError.Invalid) + # Establish blob viability before calling addbackfillBlock to avoid # writing the block in case of blob error. var blobsOk = true - when typeof(signedBlock).kind >= ConsensusFork.Deneb: + when typeof(signedBlock).kind in [ConsensusFork.Deneb, ConsensusFork.Electra]: if blobsOpt.isSome: let blobs = blobsOpt.get() let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq @@ -206,6 +257,9 @@ proc storeBackfillBlock( blobsOk = r.isOk() if not blobsOk: + info "FOO3 invalid because invalid blobs, shouldn't be triggering here", + blockRoot = shortLog(signedBlock.root), + slot = signedBlock.message.slot return err(VerifierError.Invalid) let res = self.consensusManager.dag.addBackfillBlock(signedBlock) @@ -217,11 +271,19 @@ proc storeBackfillBlock( self.consensusManager.quarantine[].unviable: # DAG doesn't know about unviable ancestor blocks - we do! Translate # this to the appropriate error so that sync etc doesn't retry the block + info "FOO1 marking unviable because addBackfillBlock + parent", + blockRoot = shortLog(signedBlock.root), + slot = signedBlock.message.slot, + parentBlockRoot = shortLog(signedBlock.message.parent_root) self.consensusManager.quarantine[].addUnviable(signedBlock.root) return err(VerifierError.UnviableFork) of VerifierError.UnviableFork: # Track unviables so that descendants can be discarded properly + info "FOO2 marking unviable because addBackFillblockt", + blockRoot = shortLog(signedBlock.root), + slot = signedBlock.message.slot, + parentBlockRoot = shortLog(signedBlock.message.parent_root) self.consensusManager.quarantine[].addUnviable(signedBlock.root) else: discard return res @@ -231,6 +293,12 @@ proc storeBackfillBlock( for b in blobs: self.consensusManager.dag.db.putBlobSidecar(b[]) + # Only store data columns after successfully establishing block validity + let + columns = dataColumnsOpt.valueOr: DataColumnSidecars @[] + for c in columns: + self.consensusManager.dag.db.putDataColumnSidecar(c[]) + res from web3/engine_api_types import @@ -377,19 +445,19 @@ proc getExecutionValidity( blck = shortLog(blck) return NewPayloadStatus.noResponse -proc checkBloblessSignature( +proc checkBlobOrColumnlessSignature( self: BlockProcessor, signed_beacon_block: deneb.SignedBeaconBlock | electra.SignedBeaconBlock | fulu.SignedBeaconBlock): Result[void, cstring] = let dag = self.consensusManager.dag let parent = dag.getBlockRef(signed_beacon_block.message.parent_root).valueOr: - return err("checkBloblessSignature called with orphan block") + return err("checkBlobOrColumnlessSignature called with orphan block") let proposer = getProposer( dag, parent, signed_beacon_block.message.slot).valueOr: - return err("checkBloblessSignature: Cannot compute proposer") + return err("checkBlobOrColumnlessSignature: Cannot compute proposer") if distinctBase(proposer) != signed_beacon_block.message.proposer_index: - return err("checkBloblessSignature: Incorrect proposer") + return err("checkBlobOrColumnlessSignature: Incorrect proposer") if not verify_block_signature( dag.forkAtEpoch(signed_beacon_block.message.slot.epoch), getStateField(dag.headState, genesis_validators_root), @@ -397,12 +465,12 @@ proc checkBloblessSignature( signed_beacon_block.root, dag.validatorKey(proposer).get(), signed_beacon_block.signature): - return err("checkBloblessSignature: Invalid proposer signature") + return err("checkBlobOrColumnlessSignature: Invalid proposer signature") ok() proc enqueueBlock*( self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, - blobs: Opt[BlobSidecars], + blobs: Opt[BlobSidecars], data_columns: Opt[DataColumnSidecars], resfut: Future[Result[void, VerifierError]].Raising([CancelledError]) = nil, maybeFinalized = false, validationDur = Duration()) = @@ -410,7 +478,7 @@ proc enqueueBlock*( if forkyBlck.message.slot <= self.consensusManager.dag.finalizedHead.slot: # let backfill blocks skip the queue - these are always "fast" to process # because there are no state rewinds to deal with - let res = self.storeBackfillBlock(forkyBlck, blobs) + let res = self.storeBackfillBlock(forkyBlck, blobs, data_columns) resfut.complete(res) return @@ -418,6 +486,7 @@ proc enqueueBlock*( self.blockQueue.addLastNoWait(BlockEntry( blck: blck, blobs: blobs, + columns: data_columns, maybeFinalized: maybeFinalized, resfut: resfut, queueTick: Moment.now(), validationDur: validationDur, @@ -445,6 +514,7 @@ proc storeBlock( self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime, signedBlock: ForkySignedBeaconBlock, blobsOpt: Opt[BlobSidecars], + dataColumnsOpt: Opt[DataColumnSidecars], maybeFinalized = false, queueTick: Moment = Moment.now(), validationDur = Duration()): Future[Result[BlockRef, (VerifierError, ProcessingStatus)]] {.async: (raises: [CancelledError]).} = @@ -482,6 +552,10 @@ proc storeBlock( if signedBlock.message.parent_root in self.consensusManager.quarantine[].unviable: # DAG doesn't know about unviable ancestor blocks - we do however! + info "FOO5 marking unviable because of parent root aleady being in quarantine", + blockRoot = shortLog(signedBlock.root), + slot = signedBlock.message.slot, + parentBlockRoot = shortLog(signedBlock.message.parent_root) self.consensusManager.quarantine[].addUnviable(signedBlock.root) return err((VerifierError.UnviableFork, ProcessingStatus.completed)) @@ -501,6 +575,9 @@ proc storeBlock( else: if blobsOpt.isSome: self.blobQuarantine[].put(signedBlock.root, blobsOpt.get) + if dataColumnsOpt.isSome: + self.dataColumnQuarantine[].put(signedBlock.root, dataColumnsOpt.get) + debug "Block quarantined", blockRoot = shortLog(signedBlock.root), blck = shortLog(signedBlock.message), @@ -508,6 +585,10 @@ proc storeBlock( of VerifierError.UnviableFork: # Track unviables so that descendants can be discarded promptly + info "FOO4 marking unviable", + blockRoot = shortLog(signedBlock.root), + slot = signedBlock.message.slot, + parentBlockRoot = shortLog(signedBlock.message.parent_root) self.consensusManager.quarantine[].addUnviable(signedBlock.root) else: discard @@ -541,10 +622,25 @@ proc storeBlock( parent_root = signedBlock.message.parent_root parentBlck = dag.getForkedBlock(parent_root) if parentBlck.isSome(): + var columnsOk = true + let columns = + withBlck(parentBlck.get()): + when consensusFork >= ConsensusFork.Fulu: + var data_column_sidecars: DataColumnSidecars + for i in self.dataColumnQuarantine[].custodyColumns: + let data_column = DataColumnSidecar.new() + if not dag.db.getDataColumnSidecar(parent_root, i.ColumnIndex, data_column[]): + columnsOk = false + break + data_column_sidecars.add data_column + Opt.some data_column_sidecars + else: + Opt.none DataColumnSidecars + var blobsOk = true let blobs = withBlck(parentBlck.get()): - when consensusFork >= ConsensusFork.Deneb: + when consensusFork in [ConsensusFork.Deneb, ConsensusFork.Electra]: var blob_sidecars: BlobSidecars for i in 0 ..< forkyBlck.message.body.blob_kzg_commitments.len: let blob = BlobSidecar.new() @@ -555,10 +651,27 @@ proc storeBlock( Opt.some blob_sidecars else: Opt.none BlobSidecars - if blobsOk: + # Blobs and columns can never co-exist in the same block + # Block has neither blob sidecar nor data column sidecar + if blobs.isNone and columns.isNone: debug "Loaded parent block from storage", parent_root self[].enqueueBlock( - MsgSource.gossip, parentBlck.unsafeGet().asSigned(), blobs) + MsgSource.gossip, parentBlck.unsafeGet().asSigned(), Opt.none(BlobSidecars), + Opt.none(DataColumnSidecars)) + # Block has blob sidecars associated and NO data column sidecars + # as they cannot co-exist. + if blobsOk and blobs.isSome: + debug "Loaded parent block from storage", parent_root + self[].enqueueBlock( + MsgSource.gossip, parentBlck.unsafeGet().asSigned(), blobs, + Opt.none(DataColumnSidecars)) + # Block has data column sidecars associated and NO blob sidecars + # as they cannot co-exist. + if columnsOk and columns.isSome: + debug "Loaded parent block from storage", parent_root + self[].enqueueBlock( + MsgSource.gossip, parentBlck.unsafeGet().asSigned(), Opt.none(BlobSidecars), + columns) return handleVerifierError(parent.error()) @@ -632,10 +745,28 @@ proc storeBlock( let newPayloadTick = Moment.now() + when typeof(signedBlock).kind >= ConsensusFork.Fulu: + if dataColumnsOpt.isSome: + let + columns0 = dataColumnsOpt.get() + kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq + if columns0.len > 0 and kzgCommits.len > 0: + for i in 0..= ConsensusFork.Deneb: + elif typeof(signedBlock).kind >= ConsensusFork.Deneb: if blobsOpt.isSome: let blobs = blobsOpt.get() let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq @@ -698,6 +829,11 @@ proc storeBlock( for b in blobs: self.consensusManager.dag.db.putBlobSidecar(b[]) + # write data columns now that block has been written + let data_columns = dataColumnsOpt.valueOr: DataColumnSidecars @[] + for col in data_columns: + self.consensusManager.dag.db.putDataColumnSidecar(col[]) + let addHeadBlockTick = Moment.now() # Eagerly update head: the incoming block "should" get selected. @@ -840,21 +976,48 @@ proc storeBlock( withBlck(quarantined): when typeof(forkyBlck).kind < ConsensusFork.Deneb: self[].enqueueBlock( - MsgSource.gossip, quarantined, Opt.none(BlobSidecars)) - else: + MsgSource.gossip, quarantined, Opt.none(BlobSidecars), + Opt.none(DataColumnSidecars)) + elif typeof(forkyBlck).kind >= ConsensusFork.Fulu: if len(forkyBlck.message.body.blob_kzg_commitments) == 0: self[].enqueueBlock( - MsgSource.gossip, quarantined, Opt.some(BlobSidecars @[])) + MsgSource.gossip, quarantined, Opt.some(BlobSidecars @[]), + Opt.some(DataColumnSidecars @[])) else: - if (let res = checkBloblessSignature(self[], forkyBlck); res.isErr): + if (let res = checkBlobOrColumnlessSignature(self[], + forkyBlck); + res.isErr): warn "Failed to verify signature of unorphaned blobless block", blck = shortLog(forkyBlck), error = res.error() continue + let cres = + self.dataColumnQuarantine[].popSidecars(forkyBlck.root, forkyBlck) + if cres.isSome: + self[].enqueueBlock( + MsgSource.gossip, quarantined, Opt.none(BlobSidecars), + cres) + else: + discard self.consensusManager.quarantine[].addColumnless( + dag.finalizedHead.slot, forkyBlck) + elif typeof(forkyBlck).kind in [ConsensusFork.Deneb, ConsensusFork.Electra]: + if len(forkyBlck.message.body.blob_kzg_commitments) == 0: + self[].enqueueBlock( + MsgSource.gossip, quarantined, Opt.some(BlobSidecars @[]), + Opt.some(DataColumnSidecars @[])) + else: + if (let res = checkBlobOrColumnlessSignature(self[], + forkyBlck); + res.isErr): + warn "Failed to verify signature of unorphaned columnless block", + blck = shortLog(forkyBlck), + error = res.error() + continue let bres = self.blobQuarantine[].popSidecars(forkyBlck.root, forkyBlck) if bres.isSome(): - self[].enqueueBlock(MsgSource.gossip, quarantined, bres) + self[].enqueueBlock(MsgSource.gossip, quarantined, bres, + Opt.none(DataColumnSidecars)) else: self.consensusManager.quarantine[].addSidecarless(forkyBlck) @@ -865,7 +1028,7 @@ proc storeBlock( proc addBlock*( self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, - blobs: Opt[BlobSidecars], maybeFinalized = false, + blobs: Opt[BlobSidecars], dataColumns: Opt[DataColumnSidecars], maybeFinalized = false, validationDur = Duration()): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = ## Enqueue a Gossip-validated block for consensus verification # Backpressure: @@ -877,7 +1040,7 @@ proc addBlock*( # - RequestManager (missing ancestor blocks) # - API let resfut = newFuture[Result[void, VerifierError]]("BlockProcessor.addBlock") - enqueueBlock(self, src, blck, blobs, resfut, maybeFinalized, validationDur) + enqueueBlock(self, src, blck, blobs, dataColumns, resfut, maybeFinalized, validationDur) resfut # Event Loop @@ -898,8 +1061,8 @@ proc processBlock( let res = withBlck(entry.blck): await self.storeBlock( - entry.src, wallTime, forkyBlck, entry.blobs, entry.maybeFinalized, - entry.queueTick, entry.validationDur) + entry.src, wallTime, forkyBlck, entry.blobs, entry.columns, + entry.maybeFinalized, entry.queueTick, entry.validationDur) if res.isErr and res.error[1] == ProcessingStatus.notCompleted: # When an execution engine returns an error or fails to respond to a @@ -910,7 +1073,7 @@ proc processBlock( # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.9/sync/optimistic.md#execution-engine-errors await sleepAsync(chronos.seconds(1)) self[].enqueueBlock( - entry.src, entry.blck, entry.blobs, entry.resfut, entry.maybeFinalized, + entry.src, entry.blck, entry.blobs, entry.columns, entry.resfut, entry.maybeFinalized, entry.validationDur) # To ensure backpressure on the sync manager, do not complete these futures. return diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index 16dd94fa45..fb29e09cbe 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -8,14 +8,17 @@ {.push raises: [].} import - std/tables, + std/[tables, sequtils], chronicles, chronos, metrics, taskpools, - ../spec/[helpers, forks], + kzg4844/kzg, + ssz_serialization/types, + ../el/el_manager, + ../spec/[helpers, forks, peerdas_helpers], ../consensus_object_pools/[ blob_quarantine, block_clearance, block_quarantine, blockchain_dag, - attestation_pool, light_client_pool, sync_committee_msg_pool, - validator_change_pool], + attestation_pool, light_client_pool, + sync_committee_msg_pool, validator_change_pool], ../validators/validator_pool, ../beacon_clock, "."/[gossip_validation, block_processor, batch_validation], @@ -46,6 +49,10 @@ declareCounter blob_sidecars_received, "Number of valid blobs processed by this node" declareCounter blob_sidecars_dropped, "Number of invalid blobs dropped by this node", labels = ["reason"] +declareCounter data_column_sidecars_received, + "Number of valid data columns processed by this node" +declareCounter data_column_sidecars_dropped, + "Number of invalid data columns dropped by this node", labels = ["reason"] declareCounter beacon_attester_slashings_received, "Number of valid attester slashings processed by this node" declareCounter beacon_attester_slashings_dropped, @@ -89,6 +96,10 @@ declareHistogram beacon_block_delay, declareHistogram blob_sidecar_delay, "Time(s) between slot start and blob sidecar reception", buckets = delayBuckets +declareHistogram data_column_sidecar_delay, + "Time(s) betweeen slot start and data column sidecar reception", + buckets = delayBuckets + type DoppelgangerProtection = object broadcastStartEpoch*: Epoch ##\ @@ -144,6 +155,8 @@ type blobQuarantine*: ref BlobQuarantine + dataColumnQuarantine*: ref ColumnQuarantine + # Application-provided current time provider (to facilitate testing) getCurrentBeaconTime*: GetBeaconTimeFn @@ -167,6 +180,7 @@ proc new*(T: type Eth2Processor, lightClientPool: ref LightClientPool, quarantine: ref Quarantine, blobQuarantine: ref BlobQuarantine, + dataColumnQuarantine: ref ColumnQuarantine, rng: ref HmacDrbgContext, getBeaconTime: GetBeaconTimeFn, taskpool: Taskpool @@ -185,6 +199,7 @@ proc new*(T: type Eth2Processor, lightClientPool: lightClientPool, quarantine: quarantine, blobQuarantine: blobQuarantine, + dataColumnQuarantine: dataColumnQuarantine, getCurrentBeaconTime: getBeaconTime, batchCrypto: BatchCrypto.new( rng = rng, @@ -240,7 +255,8 @@ proc processSignedBeaconBlock*( self.dag.onBlockGossipAdded(ForkedSignedBeaconBlock.init(signedBlock)) let blobs = - when typeof(signedBlock).kind >= ConsensusFork.Deneb: + when typeof(signedBlock).kind >= ConsensusFork.Deneb and + typeof(signedBlock).kind < ConsensusFork.Fulu: let bres = self.blobQuarantine[].popSidecars(signedBlock.root, signedBlock) if bres.isSome(): @@ -251,13 +267,38 @@ proc processSignedBeaconBlock*( else: Opt.none(BlobSidecars) + let columns = + when typeof(signedBlock).kind >= ConsensusFork.Fulu: + let cres = + self.dataColumnQuarantine[].popSidecars(signedBlock.root, + signedBlock) + if cres.isSome(): + cres + else: + discard self.quarantine[].addColumnless(self.dag.finalizedHead.slot, + signedBlock) + return v + else: + Opt.none(DataColumnSidecars) + + info "BAR6 enqueuing block", + blockRoot = shortLog(signedBlock), + slot = signedBlock.message.slot, + parentBlockRoot = shortLog(signedBlock.message.parent_root) + self.blockProcessor[].enqueueBlock( src, ForkedSignedBeaconBlock.init(signedBlock), blobs, + columns, maybeFinalized = maybeFinalized, validationDur = nanoseconds( (self.getCurrentBeaconTime() - wallTime).nanoseconds)) + info "BAR7 enqueued block", + blockRoot = shortLog(signedBlock), + slot = signedBlock.message.slot, + parentBlockRoot = shortLog(signedBlock.message.parent_root) + # Validator monitor registration for blocks is done by the processor beacon_blocks_received.inc() beacon_block_delay.observe(delay.toFloatSeconds()) @@ -303,20 +344,104 @@ proc processBlobSidecar*( if (let o = self.quarantine[].popSidecarless(block_root); o.isSome): let blobless = o.unsafeGet() withBlck(blobless): - when consensusFork >= ConsensusFork.Deneb: + when consensusFork in [ConsensusFork.Deneb, ConsensusFork.Electra]: let bres = self.blobQuarantine[].popSidecars(block_root, forkyBlck) if bres.isSome(): - self.blockProcessor[].enqueueBlock(MsgSource.gossip, blobless, bres) + self.blockProcessor[].enqueueBlock(MsgSource.gossip, blobless, bres, + Opt.none(DataColumnSidecars)) else: self.quarantine[].addSidecarless(forkyBlck) else: - raiseAssert "Could not have been added as blobless" + raiseAssert "Could not be added as blobless" blob_sidecars_received.inc() blob_sidecar_delay.observe(delay.toFloatSeconds()) v +proc validateDataColumnSidecarFromEL*( + self: ref Eth2Processor, + block_root: Eth2Digest) + {.async: (raises: [CancelledError]).} = + let elManager = self.blockProcessor[].consensusManager.elManager + if (let o = self.quarantine[].getColumnless(block_root); o.isSome): + let columnless = o.unsafeGet() + withBlck(columnless): + when consensusFork >= ConsensusFork.Fulu: + let + blobsFromElOpt = + await elManager.sendGetBlobsV2(forkyBlck) + if blobsFromElOpt.isSome(): + let blobsEl = blobsFromElOpt.get() + # check lengths of array[BlobAndProofV2 with blobs + # kzg commitments of the signed block + if blobsEl.len == forkyBlck.message.body.blob_kzg_commitments.len: + # we have received all columns from the EL + # hence we can safely remove the columnless block from quarantine + var flat_proof: seq[kzg.KzgProof] = @[] + for item in blobsEl: + for proof in item.proofs: + flat_proof.add(kzg.KzgProof(bytes: proof.data)) + let + recovered_columns = + assemble_data_column_sidecars( + forkyBlck, + blobsEl.mapIt(kzg.KzgBlob(bytes: it.blob.data)), + flat_proof) + # Send notification to event stream + # and add these columns to column quarantine + for col in recovered_columns: + if col.index in self.dataColumnQuarantine[].custodyColumns: + self.dataColumnQuarantine[].put(block_root, newClone(col)) + +proc processDataColumnSidecar*( + self: ref Eth2Processor, src: MsgSource, + dataColumnSidecar: DataColumnSidecar, subnet_id: uint64): + Future[ValidationRes] {.async: (raises: [CancelledError]).} = + template block_header: untyped = dataColumnSidecar.signed_block_header.message + let block_root = hash_tree_root(block_header) + await self.validateDataColumnSidecarFromEL(block_root) + let + wallTime = self.getCurrentBeaconTime() + (_, wallSlot) = wallTime.toSlot() + logScope: + dcs = shortLog(dataColumnSidecar) + wallSlot + # Potential under/overflows are fine; would just create odd metrics and logs + let delay = wallTime - block_header.slot.start_beacon_time + debug "Data column received", delay + + let v = + self.dag.validateDataColumnSidecar(self.quarantine, self.dataColumnQuarantine, + dataColumnSidecar, wallTime, subnet_id) + if v.isErr(): + debug "Dropping data column", error = v.error() + data_column_sidecars_dropped.inc(1, [$v.error[0]]) + return v + debug "Data column validated, putting data column in quarantine" + self.dataColumnQuarantine[].put(block_root, newClone(dataColumnSidecar)) + if (let o = self.quarantine[].popColumnless(block_root); o.isSome): + let columnless = o.unsafeGet() + withBlck(columnless): + when consensusFork >= ConsensusFork.Fulu: + let cres = + self.dataColumnQuarantine[].popSidecars(block_root, forkyBlck) + if cres.isSome(): + self.blockProcessor[].enqueueBlock( + MsgSource.gossip, columnless, + Opt.none(BlobSidecars), + cres) + else: + discard self.quarantine[].addColumnless( + self.dag.finalizedHead.slot, forkyBlck) + else: + raiseAssert "Could not be added as columnless" + + data_column_sidecars_received.inc() + data_column_sidecar_delay.observe(delay.toFloatSeconds()) + + v + proc setupDoppelgangerDetection*(self: var Eth2Processor, slot: Slot) = # When another client's already running, this is very likely to detect # potential duplicate validators, which can trigger slashing. diff --git a/beacon_chain/gossip_processing/gossip_validation.nim b/beacon_chain/gossip_processing/gossip_validation.nim index 7399a8bbe3..5bec8b6c3c 100644 --- a/beacon_chain/gossip_processing/gossip_validation.nim +++ b/beacon_chain/gossip_processing/gossip_validation.nim @@ -24,6 +24,7 @@ import ".."/[beacon_clock], ./batch_validation +from std/sequtils import mapIt from libp2p/protocols/pubsub/errors import ValidationResult export results, ValidationResult diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index f7254f8679..c9356b74b4 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -95,7 +95,7 @@ type peerPingerHeartbeatFut: Future[void].Raising([CancelledError]) peerTrimmerHeartbeatFut: Future[void].Raising([CancelledError]) cfg*: RuntimeConfig - getBeaconTime: GetBeaconTimeFn + getBeaconTime*: GetBeaconTimeFn quota: TokenBucket ## Global quota mainly for high-bandwidth stuff diff --git a/beacon_chain/networking/peer_scores.nim b/beacon_chain/networking/peer_scores.nim index c6f2c10bd4..4d7327eed6 100644 --- a/beacon_chain/networking/peer_scores.nim +++ b/beacon_chain/networking/peer_scores.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018-2024 Status Research & Development GmbH +# Copyright (c) 2018-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -35,6 +35,8 @@ const ## Peer's answer to our request is fine. PeerScoreBadValues* = -1000 ## Peer's response contains incorrect data. + PeerScoreBadColumnIntersection* = -2 + ## Peer custodies irrelevant custody columns PeerScoreBadResponse* = -1000 ## Peer's response is not in requested range. PeerScoreMissingValues* = -25 diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index fdd2c3822a..7a3f5bfc4a 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -20,8 +20,9 @@ import ./rpc/[rest_api, state_ttl_cache], ./spec/datatypes/[altair, bellatrix, phase0], ./spec/[ - engine_authentication, weak_subjectivity, peerdas_helpers], - ./sync/[sync_protocol, light_client_protocol, sync_overseer], + engine_authentication, weak_subjectivity, + peerdas_helpers], + ./sync/[sync_protocol, light_client_protocol, sync_overseer, validator_custody], ./validators/[keystore_management, beacon_validators], "."/[ beacon_node, beacon_node_light_client, deposits, @@ -440,7 +441,7 @@ proc initFullNode( blockProcessor = BlockProcessor.new( config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming, batchVerifier, consensusManager, node.validatorMonitor, - blobQuarantine, getBeaconTime, config.invalidBlockRoots) + blobQuarantine, dataColumnQuarantine, getBeaconTime, config.invalidBlockRoots) blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized: bool): @@ -450,7 +451,8 @@ proc initFullNode( # taken in the sync/request managers - this is an architectural compromise # that should probably be reimagined more holistically in the future. blockProcessor[].addBlock( - MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized) + MsgSource.gossip, signedBlock, blobs, Opt.none(DataColumnSidecars), + maybeFinalized = maybeFinalized) untrustedBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized: bool): Future[Result[void, VerifierError]] {. @@ -460,10 +462,27 @@ proc initFullNode( maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} = withBlck(signedBlock): - when consensusFork >= ConsensusFork.Deneb: + # Keeping Fulu first else >= Deneb means Fulu case never hits + when consensusFork >= ConsensusFork.Fulu: + let cres = dataColumnQuarantine[].popSidecars(forkyBlck.root, forkyBlck) + if cres.isSome(): + await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, + Opt.none(BlobSidecars), + cres, + maybeFinalized = maybeFinalized) + else: + # We don't have all the columns for this block, so we have + # to put it in columnless quarantine. + if not quarantine[].addColumnless(dag.finalizedHead.slot, forkyBlck): + err(VerifierError.UnviableFork) + else: + err(VerifierError.MissingParent) + + elif consensusFork in [ConsensusFork.Deneb, ConsensusFork.Electra]: let bres = blobQuarantine[].popSidecars(forkyBlck.root, forkyBlck) if bres.isSome(): await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, bres, + Opt.none(DataColumnSidecars), maybeFinalized = maybeFinalized) else: # We don't have all the sidecars for this block, so we have @@ -475,7 +494,7 @@ proc initFullNode( err(VerifierError.MissingParent) else: await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, - Opt.none(BlobSidecars), + Opt.none(BlobSidecars), Opt.none(DataColumnSidecars), maybeFinalized = maybeFinalized) rmanBlockLoader = proc( blockRoot: Eth2Digest): Opt[ForkedTrustedSignedBeaconBlock] = @@ -499,7 +518,8 @@ proc initFullNode( config.doppelgangerDetection, blockProcessor, node.validatorMonitor, dag, attestationPool, validatorChangePool, node.attachedValidators, syncCommitteeMsgPool, - lightClientPool, quarantine, blobQuarantine, rng, getBeaconTime, taskpool) + lightClientPool, quarantine, blobQuarantine, dataColumnQuarantine, + rng, getBeaconTime, taskpool) syncManagerFlags = if node.config.longRangeSync != LongRangeSyncMode.Lenient: {SyncManagerFlag.NoGenesisSync} @@ -534,6 +554,8 @@ proc initFullNode( clist.tail.get().blck.slot() else: getLocalWallSlot() + eaSlot = dag.head.slot + erSlot = dag.head.slot untrustedManager = newSyncManager[Peer, PeerId]( node.network.peerPool, dag.cfg.DENEB_FORK_EPOCH, @@ -554,6 +576,8 @@ proc initFullNode( dag.cfg.DENEB_FORK_EPOCH, getBeaconTime, (proc(): bool = syncManager.inProgress), quarantine, blobQuarantine, dataColumnQuarantine, rmanBlockVerifier, rmanBlockLoader, rmanBlobLoader, rmanDataColumnLoader) + validatorCustody = ValidatorCustodyRef.init(node.network, dag, supernode, + getLocalHeadSlot, custodyColumns, getBeaconTime, dataColumnQuarantine) # As per EIP 7594, the BN is now categorised into a # `Fullnode` and a `Supernode`, the fullnodes custodies a @@ -602,6 +626,8 @@ proc initFullNode( dag.setReorgCb(onChainReorg) node.dag = dag + node.dag.erSlot = erSlot + node.dag.eaSlot = eaSlot node.list = clist node.blobQuarantine = blobQuarantine node.dataColumnQuarantine = dataColumnQuarantine @@ -615,6 +641,7 @@ proc initFullNode( node.blockProcessor = blockProcessor node.consensusManager = consensusManager node.requestManager = requestManager + node.validatorCustody = validatorCustody node.syncManager = syncManager node.backfiller = backfiller node.untrustedManager = untrustedManager @@ -1247,6 +1274,15 @@ func getSyncCommitteeSubnets(node: BeaconNode, epoch: Epoch): SyncnetBits = subnets + node.getNextSyncCommitteeSubnets(epoch) +func readCustodyGroupSubnets(node: BeaconNode): uint64 = + let vcus_count = node.dataColumnQuarantine.custodyColumns.lenu64 + if node.config.peerdasSupernode: + node.dag.cfg.NUMBER_OF_CUSTODY_GROUPS.uint64 + elif vcus_count > node.dag.cfg.CUSTODY_REQUIREMENT.uint64: + vcus_count + else: + node.dag.cfg.CUSTODY_REQUIREMENT.uint64 + proc addAltairMessageHandlers( node: BeaconNode, forkDigest: ForkDigest, slot: Slot) = node.addPhase0MessageHandlers(forkDigest, slot) @@ -1296,7 +1332,15 @@ proc addElectraMessageHandlers( proc addFuluMessageHandlers( node: BeaconNode, forkDigest: ForkDigest, slot: Slot) = - node.addElectraMessageHandlers(forkDigest, slot) + node.addCapellaMessageHandlers(forkDigest, slot) + let + targetSubnets = node.readCustodyGroupSubnets() + custody = node.network.nodeId.get_custody_groups(max(node.dag.cfg.SAMPLES_PER_SLOT.uint64, + targetSubnets.uint64)) + + for i in custody: + let topic = getDataColumnSidecarTopic(forkDigest, i) + node.network.subscribe(topic, basicParams()) proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) = node.removePhase0MessageHandlers(forkDigest) @@ -1328,7 +1372,15 @@ proc removeElectraMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) = forkDigest, node.dag.cfg.BLOB_SIDECAR_SUBNET_COUNT_ELECTRA) proc removeFuluMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) = - node.removeElectraMessageHandlers(forkDigest) + node.removeCapellaMessageHandlers(forkDigest) + let + targetSubnets = node.readCustodyGroupSubnets() + custody = node.network.nodeId.get_custody_groups(max(node.dag.cfg.SAMPLES_PER_SLOT.uint64, + targetSubnets.uint64)) + + for i in custody: + let topic = getDataColumnSidecarTopic(forkDigest, i) + node.network.unsubscribe(topic) proc updateSyncCommitteeTopics(node: BeaconNode, slot: Slot) = template lastSyncUpdate: untyped = @@ -1604,6 +1656,7 @@ proc pruneBlobs(node: BeaconNode, slot: Slot) = withBlck(blck): when typeof(forkyBlck).kind < ConsensusFork.Deneb: continue else: + node.dag.eaSlot = forkyBlck.message.slot for j in 0..len(forkyBlck.message.body.blob_kzg_commitments) - 1: if node.db.delBlobSidecar(blocks[int(i)].root, BlobIndex(j)): count = count + 1 @@ -1622,6 +1675,7 @@ proc pruneDataColumns(node: BeaconNode, slot: Slot) = withBlck(blck): when typeof(forkyBlck).kind < ConsensusFork.Fulu: continue else: + node.dag.eaSlot = forkyBlck.message.slot for j in 0.. 0: + var custodyColumns = + node.validatorCustody.newer_column_set.toSeq() + sort(custodyColumns) + # update custody columns + node.dataColumnQuarantine.updateColumnQuarantine( + node.dag.cfg, custodyColumns) + + # Update CGC and metadata with respect to the new detected validator custody + let new_vcus = CgcCount node.validatorCustody.newer_column_set.lenu64 + + if new_vcus > node.dag.cfg.SAMPLES_PER_SLOT.uint8: + node.network.loadCgcnetMetadataAndEnr(new_vcus) + else: + node.network.loadCgcnetMetadataAndEnr(max(node.dag.cfg.SAMPLES_PER_SLOT.uint8, + node.dag.cfg.CUSTODY_REQUIREMENT.uint8)) + + debug "Custody column count after validator custody detection attempt", + custody_columns = node.dataColumnQuarantine.custodyColumns.len + # Update nfd field for BPOs let nextForkEpoch = node.dag.cfg.nextForkEpochAtEpoch(epoch) @@ -1954,174 +2036,192 @@ proc installMessageValidators(node: BeaconNode) = for fork in ConsensusFork: withConsensusFork(fork): - let digest = forkDigests[].atConsensusFork(consensusFork) - - # beacon_block - # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/phase0/p2p-interface.md#beacon_block - node.network.addValidator( - getBeaconBlocksTopic(digest), proc ( - signedBlock: consensusFork.SignedBeaconBlock, - src: PeerId - ): ValidationResult = - if node.shouldSyncOptimistically(node.currentSlot): - toValidationResult( - node.optimisticProcessor.processSignedBeaconBlock( - signedBlock)) - else: - toValidationResult( - node.processor[].processSignedBeaconBlock( - MsgSource.gossip, signedBlock))) - - # beacon_attestation_{subnet_id} - # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#beacon_attestation_subnet_id - when consensusFork >= ConsensusFork.Electra: - for it in SubnetId: - closureScope: # Needed for inner `proc`; don't lift it out of loop. - let subnet_id = it - node.network.addAsyncValidator( - getAttestationTopic(digest, subnet_id), proc ( - attestation: SingleAttestation, src: PeerId - ): Future[ValidationResult] {. - async: (raises: [CancelledError]).} = - return toValidationResult( - await node.processor.processAttestation( - MsgSource.gossip, attestation, subnet_id, - checkSignature = true, checkValidator = false))) - else: - for it in SubnetId: - closureScope: # Needed for inner `proc`; don't lift it out of loop. - let subnet_id = it - node.network.addAsyncValidator( - getAttestationTopic(digest, subnet_id), proc ( - attestation: phase0.Attestation, src: PeerId - ): Future[ValidationResult] {. - async: (raises: [CancelledError]).} = - return toValidationResult( - await node.processor.processAttestation( - MsgSource.gossip, attestation, subnet_id, - checkSignature = true, checkValidator = false))) - - # beacon_aggregate_and_proof - # https://github.com/ethereum/consensus-specs/blob/v1.6.0-alpha.0/specs/phase0/p2p-interface.md#beacon_aggregate_and_proof - when consensusFork >= ConsensusFork.Electra: - node.network.addAsyncValidator( - getAggregateAndProofsTopic(digest), proc ( - signedAggregateAndProof: electra.SignedAggregateAndProof, - src: PeerId - ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = - return toValidationResult( - await node.processor.processSignedAggregateAndProof( - MsgSource.gossip, signedAggregateAndProof))) - else: - node.network.addAsyncValidator( - getAggregateAndProofsTopic(digest), proc ( - signedAggregateAndProof: phase0.SignedAggregateAndProof, - src: PeerId - ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = - return toValidationResult( - await node.processor.processSignedAggregateAndProof( - MsgSource.gossip, signedAggregateAndProof))) - - # attester_slashing - # https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.2/specs/phase0/p2p-interface.md#attester_slashing - # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.6/specs/electra/p2p-interface.md#modifications-in-electra - when consensusFork >= ConsensusFork.Electra: + # oops, turns out a named tuple is better + let digests = @[forkDigests[].atConsensusFork(consensusFork)] & forkDigests[].bpos.filterIt(it[1] == consensusFork).mapIt(it[2]) + + for digest in digests: + let digest = digest # lent + # beacon_block + # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/phase0/p2p-interface.md#beacon_block node.network.addValidator( - getAttesterSlashingsTopic(digest), proc ( - attesterSlashing: electra.AttesterSlashing, + getBeaconBlocksTopic(digest), proc ( + signedBlock: consensusFork.SignedBeaconBlock, + src: PeerId, + ): ValidationResult = + if node.shouldSyncOptimistically(node.currentSlot): + toValidationResult( + node.optimisticProcessor.processSignedBeaconBlock( + signedBlock)) + else: + toValidationResult( + node.processor[].processSignedBeaconBlock( + MsgSource.gossip, signedBlock))) + + # beacon_attestation_{subnet_id} + # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#beacon_attestation_subnet_id + when consensusFork >= ConsensusFork.Electra: + for it in SubnetId: + closureScope: # Needed for inner `proc`; don't lift it out of loop. + let subnet_id = it + node.network.addAsyncValidator( + getAttestationTopic(digest, subnet_id), proc ( + attestation: SingleAttestation, src: PeerId + ): Future[ValidationResult] {. + async: (raises: [CancelledError]).} = + return toValidationResult( + await node.processor.processAttestation( + MsgSource.gossip, attestation, subnet_id, + checkSignature = true, checkValidator = false))) + else: + for it in SubnetId: + closureScope: # Needed for inner `proc`; don't lift it out of loop. + let subnet_id = it + node.network.addAsyncValidator( + getAttestationTopic(digest, subnet_id), proc ( + attestation: phase0.Attestation, src: PeerId + ): Future[ValidationResult] {. + async: (raises: [CancelledError]).} = + return toValidationResult( + await node.processor.processAttestation( + MsgSource.gossip, attestation, subnet_id, + checkSignature = true, checkValidator = false))) + + # beacon_aggregate_and_proof + # https://github.com/ethereum/consensus-specs/blob/v1.6.0-alpha.0/specs/phase0/p2p-interface.md#beacon_aggregate_and_proof + when consensusFork >= ConsensusFork.Electra: + node.network.addAsyncValidator( + getAggregateAndProofsTopic(digest), proc ( + signedAggregateAndProof: electra.SignedAggregateAndProof, + src: PeerId + ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = + return toValidationResult( + await node.processor.processSignedAggregateAndProof( + MsgSource.gossip, signedAggregateAndProof))) + else: + node.network.addAsyncValidator( + getAggregateAndProofsTopic(digest), proc ( + signedAggregateAndProof: phase0.SignedAggregateAndProof, + src: PeerId + ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = + return toValidationResult( + await node.processor.processSignedAggregateAndProof( + MsgSource.gossip, signedAggregateAndProof))) + + # attester_slashing + # https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.2/specs/phase0/p2p-interface.md#attester_slashing + # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.6/specs/electra/p2p-interface.md#modifications-in-electra + when consensusFork >= ConsensusFork.Electra: + node.network.addValidator( + getAttesterSlashingsTopic(digest), proc ( + attesterSlashing: electra.AttesterSlashing, + src: PeerId + ): ValidationResult = + toValidationResult( + node.processor[].processAttesterSlashing( + MsgSource.gossip, attesterSlashing))) + else: + node.network.addValidator( + getAttesterSlashingsTopic(digest), proc ( + attesterSlashing: phase0.AttesterSlashing, + src: PeerId + ): ValidationResult = + toValidationResult( + node.processor[].processAttesterSlashing( + MsgSource.gossip, attesterSlashing))) + + # proposer_slashing + # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/phase0/p2p-interface.md#proposer_slashing + node.network.addValidator( + getProposerSlashingsTopic(digest), proc ( + proposerSlashing: ProposerSlashing, src: PeerId ): ValidationResult = toValidationResult( - node.processor[].processAttesterSlashing( - MsgSource.gossip, attesterSlashing))) - else: + node.processor[].processProposerSlashing( + MsgSource.gossip, proposerSlashing))) + + # voluntary_exit + # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/phase0/p2p-interface.md#voluntary_exit node.network.addValidator( - getAttesterSlashingsTopic(digest), proc ( - attesterSlashing: phase0.AttesterSlashing, + getVoluntaryExitsTopic(digest), proc ( + signedVoluntaryExit: SignedVoluntaryExit, src: PeerId ): ValidationResult = toValidationResult( - node.processor[].processAttesterSlashing( - MsgSource.gossip, attesterSlashing))) - - # proposer_slashing - # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/phase0/p2p-interface.md#proposer_slashing - node.network.addValidator( - getProposerSlashingsTopic(digest), proc ( - proposerSlashing: ProposerSlashing, - src: PeerId - ): ValidationResult = - toValidationResult( - node.processor[].processProposerSlashing( - MsgSource.gossip, proposerSlashing))) - - # voluntary_exit - # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/phase0/p2p-interface.md#voluntary_exit - node.network.addValidator( - getVoluntaryExitsTopic(digest), proc ( - signedVoluntaryExit: SignedVoluntaryExit, - src: PeerId - ): ValidationResult = - toValidationResult( - node.processor[].processSignedVoluntaryExit( - MsgSource.gossip, signedVoluntaryExit))) - - when consensusFork >= ConsensusFork.Altair: - # sync_committee_{subnet_id} - # https://github.com/ethereum/consensus-specs/blob/v1.4.0/specs/altair/p2p-interface.md#sync_committee_subnet_id - for subcommitteeIdx in SyncSubcommitteeIndex: - closureScope: # Needed for inner `proc`; don't lift it out of loop. - let idx = subcommitteeIdx - node.network.addAsyncValidator( - getSyncCommitteeTopic(digest, idx), proc ( - msg: SyncCommitteeMessage, - src: PeerId - ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = - return toValidationResult( - await node.processor.processSyncCommitteeMessage( - MsgSource.gossip, msg, idx))) - - # sync_committee_contribution_and_proof - # https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.2/specs/altair/p2p-interface.md#sync_committee_contribution_and_proof - node.network.addAsyncValidator( - getSyncCommitteeContributionAndProofTopic(digest), proc ( - msg: SignedContributionAndProof, - src: PeerId - ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = - return toValidationResult( - await node.processor.processSignedContributionAndProof( - MsgSource.gossip, msg))) - - when consensusFork >= ConsensusFork.Capella: - # https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.4/specs/capella/p2p-interface.md#bls_to_execution_change - node.network.addAsyncValidator( - getBlsToExecutionChangeTopic(digest), proc ( - msg: SignedBLSToExecutionChange, - src: PeerId - ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = - return toValidationResult( - await node.processor.processBlsToExecutionChange( - MsgSource.gossip, msg))) - - when consensusFork >= ConsensusFork.Deneb: - # blob_sidecar_{subnet_id} - # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/deneb/p2p-interface.md#blob_sidecar_subnet_id - let subnetCount = - when consensusFork >= ConsensusFork.Electra: - node.dag.cfg.BLOB_SIDECAR_SUBNET_COUNT_ELECTRA - else: - node.dag.cfg.BLOB_SIDECAR_SUBNET_COUNT - for it in 0.BlobId ..< subnetCount.BlobId: - closureScope: # Needed for inner `proc`; don't lift it out of loop. - let subnet_id = it - node.network.addValidator( - getBlobSidecarTopic(digest, subnet_id), proc ( - blobSidecar: deneb.BlobSidecar, - src: PeerId - ): ValidationResult = - toValidationResult( - node.processor[].processBlobSidecar( - MsgSource.gossip, blobSidecar, subnet_id))) + node.processor[].processSignedVoluntaryExit( + MsgSource.gossip, signedVoluntaryExit))) + + when consensusFork >= ConsensusFork.Altair: + # sync_committee_{subnet_id} + # https://github.com/ethereum/consensus-specs/blob/v1.4.0/specs/altair/p2p-interface.md#sync_committee_subnet_id + for subcommitteeIdx in SyncSubcommitteeIndex: + closureScope: # Needed for inner `proc`; don't lift it out of loop. + let idx = subcommitteeIdx + node.network.addAsyncValidator( + getSyncCommitteeTopic(digest, idx), proc ( + msg: SyncCommitteeMessage, + src: PeerId + ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = + return toValidationResult( + await node.processor.processSyncCommitteeMessage( + MsgSource.gossip, msg, idx))) + + # sync_committee_contribution_and_proof + # https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.2/specs/altair/p2p-interface.md#sync_committee_contribution_and_proof + node.network.addAsyncValidator( + getSyncCommitteeContributionAndProofTopic(digest), proc ( + msg: SignedContributionAndProof, + src: PeerId + ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = + return toValidationResult( + await node.processor.processSignedContributionAndProof( + MsgSource.gossip, msg))) + + when consensusFork >= ConsensusFork.Capella: + # https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.4/specs/capella/p2p-interface.md#bls_to_execution_change + node.network.addAsyncValidator( + getBlsToExecutionChangeTopic(digest), proc ( + msg: SignedBLSToExecutionChange, + src: PeerId + ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = + return toValidationResult( + await node.processor.processBlsToExecutionChange( + MsgSource.gossip, msg))) + + # data_column_sidecar_{subnet_id} + when consensusFork >= ConsensusFork.Fulu: + # data_column_sidecar_{subnet_id} + for it in 0'u64..= ConsensusFork.Deneb: + # blob_sidecar_{subnet_id} + # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/deneb/p2p-interface.md#blob_sidecar_subnet_id + let subnetCount = + when consensusFork >= ConsensusFork.Electra: + node.dag.cfg.BLOB_SIDECAR_SUBNET_COUNT_ELECTRA + else: + node.dag.cfg.BLOB_SIDECAR_SUBNET_COUNT + for it in 0.BlobId ..< subnetCount.BlobId: + closureScope: # Needed for inner `proc`; don't lift it out of loop. + let subnet_id = it + node.network.addValidator( + getBlobSidecarTopic(digest, subnet_id), proc ( + blobSidecar: deneb.BlobSidecar, + src: PeerId + ): ValidationResult = + toValidationResult( + node.processor[].processBlobSidecar( + MsgSource.gossip, blobSidecar, subnet_id))) node.installLightClientMessageValidators() @@ -2164,6 +2264,10 @@ proc run(node: BeaconNode) {.raises: [CatchableError].} = node.startLightClient() node.requestManager.start() + if node.network.getBeaconTime().slotOrZero.epoch >= + node.network.cfg.FULU_FORK_EPOCH: + node.requestManager.switchToColumnLoop() + node.validatorCustody.start() node.syncOverseer.start() waitFor node.updateGossipStatus(wallSlot) diff --git a/beacon_chain/rpc/rest_beacon_api.nim b/beacon_chain/rpc/rest_beacon_api.nim index 0a08497431..11dde79c26 100644 --- a/beacon_chain/rpc/rest_beacon_api.nim +++ b/beacon_chain/rpc/rest_beacon_api.nim @@ -15,7 +15,9 @@ import ./state_ttl_cache, ../beacon_node, ../consensus_object_pools/[blockchain_dag, spec_cache, validator_change_pool], - ../spec/[eth2_merkleization, forks, network, validator], + ../spec/[ + peerdas_helpers, eth2_merkleization, + forks, network, validator], ../validators/message_router_mev from ../spec/mev/bellatrix_mev import toSignedBlindedBeaconBlock @@ -1042,14 +1044,25 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = doAssert strictVerification notin node.dag.updateFlags return RestApiResponse.jsonError(Http400, InvalidBlockObjectError) - when consensusFork >= ConsensusFork.Deneb: + when consensusFork in [ConsensusFork.Deneb, ConsensusFork.Electra]: await node.router.routeSignedBeaconBlock( forkyBlck, Opt.some( forkyBlck.create_blob_sidecars(kzg_proofs, blobs)), + Opt.none(seq[DataColumnSidecar]), + checkValidator = true) + elif consensusFork >= ConsensusFork.Fulu: + let data_columns = + assemble_data_column_sidecars( + forkyBlck, blobs.mapIt(kzg.KzgBlob(bytes: it)), + @(kzg_proofs.mapIt(kzg.KzgProof(it)))) + await node.router.routeSignedBeaconBlock( + forkyBlck, Opt.none(seq[BlobSidecar]), + Opt.some(data_columns), checkValidator = true) else: await node.router.routeSignedBeaconBlock( forkyBlck, Opt.none(seq[BlobSidecar]), + Opt.none(seq[DataColumnSidecar]), checkValidator = true) if res.isErr(): @@ -1099,14 +1112,25 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = doAssert strictVerification notin node.dag.updateFlags return RestApiResponse.jsonError(Http400, InvalidBlockObjectError) - when consensusFork >= ConsensusFork.Deneb: + when consensusFork in [ConsensusFork.Deneb, ConsensusFork.Electra]: await node.router.routeSignedBeaconBlock( forkyBlck, Opt.some( forkyBlck.create_blob_sidecars(kzg_proofs, blobs)), + Opt.none(seq[DataColumnSidecar]), + checkValidator = true) + elif consensusFork >= ConsensusFork.Fulu: + let data_columns = + assemble_data_column_sidecars( + forkyBlck, blobs.mapIt(kzg.KzgBlob(bytes: it)), + @(kzg_proofs.mapIt(kzg.KzgProof(it)))) + await node.router.routeSignedBeaconBlock( + forkyBlck, Opt.none(seq[BlobSidecar]), + Opt.some(data_columns), checkValidator = true) else: await node.router.routeSignedBeaconBlock( forkyBlck, Opt.none(seq[BlobSidecar]), + Opt.none(seq[DataColumnSidecar]), checkValidator = true) if res.isErr(): @@ -1236,6 +1260,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = forkyBlck.root = hash_tree_root(forkyBlck.message) await node.router.routeSignedBeaconBlock( forkyBlck, Opt.none(seq[BlobSidecar]), + Opt.none(seq[DataColumnSidecar]), checkValidator = true) if res.isErr(): @@ -1319,6 +1344,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = forkyBlck.root = hash_tree_root(forkyBlck.message) await node.router.routeSignedBeaconBlock( forkyBlck, Opt.none(seq[BlobSidecar]), + Opt.none(seq[DataColumnSidecar]), checkValidator = true) if res.isErr(): diff --git a/beacon_chain/spec/beaconstate.nim b/beacon_chain/spec/beaconstate.nim index 0792e25144..339a19a9c2 100644 --- a/beacon_chain/spec/beaconstate.nim +++ b/beacon_chain/spec/beaconstate.nim @@ -564,7 +564,7 @@ func get_block_root(state: ForkedHashedBeaconState, epoch: Epoch): Eth2Digest = get_block_root(forkyState.data, epoch) # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/phase0/beacon-chain.md#get_total_balance -template get_total_balance( +template get_total_balance*( state: ForkyBeaconState, validator_indices: untyped): Gwei = ## Return the combined effective balance of the ``indices``. ## ``EFFECTIVE_BALANCE_INCREMENT`` Gwei minimum to avoid divisions by zero. @@ -922,6 +922,15 @@ func get_total_active_balance*(state: ForkyBeaconState, cache: var StateCache): cache.total_active_balance[epoch] = tab return tab +func get_total_active_balance*(state: ForkyBeaconState): Gwei = + let epoch = state.get_current_epoch() + let active_val_indices = + get_active_validator_indices(state, epoch) + var res = 0.Gwei + for vi in active_val_indices: + res += state.validators[vi].effective_balance + max(EFFECTIVE_BALANCE_INCREMENT.Gwei, res) + # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.9/specs/altair/beacon-chain.md#get_base_reward_per_increment func get_base_reward_per_increment_sqrt( total_active_balance_sqrt: uint64): Gwei = diff --git a/beacon_chain/spec/datatypes/fulu.nim b/beacon_chain/spec/datatypes/fulu.nim index b9656860c9..a8c4016146 100644 --- a/beacon_chain/spec/datatypes/fulu.nim +++ b/beacon_chain/spec/datatypes/fulu.nim @@ -129,6 +129,15 @@ type column_index*: ColumnIndex row_index*: RowIndex + # https://github.com/ethereum/builder-specs/blob/ae1d97d080a12bfb7ca248b58fb1fc6b10aed02e/specs/fulu/builder.md#blobsbundle + KzgProofsV2* = List[KzgProof, Limit FIELD_ELEMENTS_PER_EXT_BLOB * MAX_BLOB_COMMITMENTS_PER_BLOCK] + + # https://github.com/ethereum/builder-specs/blob/ae1d97d080a12bfb7ca248b58fb1fc6b10aed02e/specs/fulu/builder.md#blobsbundle + BlobsBundleV2* = object + commitments*: KzgCommitments + proofs*: KzgProofsV2 + blobs*: Blobs + # Not in spec, defined in order to compute custody subnets CgcBits* = BitArray[DATA_COLUMN_SIDECAR_SUBNET_COUNT] @@ -170,7 +179,7 @@ type ExecutionPayloadForSigning* = object executionPayload*: ExecutionPayload blockValue*: Wei - blobsBundle*: BlobsBundle + blobsBundle*: BlobsBundleV2 # [New in Fulu] executionRequests*: seq[seq[byte]] # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/deneb/beacon-chain.md#executionpayloadheader @@ -612,7 +621,7 @@ type BlockContents* = object `block`*: BeaconBlock - kzg_proofs*: KzgProofs + kzg_proofs*: KzgProofsV2 blobs*: Blobs func shortLog*(v: DataColumnSidecar): auto = diff --git a/beacon_chain/spec/eth2_apis/rest_types.nim b/beacon_chain/spec/eth2_apis/rest_types.nim index e0c028d407..1831e2309c 100644 --- a/beacon_chain/spec/eth2_apis/rest_types.nim +++ b/beacon_chain/spec/eth2_apis/rest_types.nim @@ -316,7 +316,7 @@ type FuluSignedBlockContents* = object signed_block*: fulu.SignedBeaconBlock - kzg_proofs*: deneb.KzgProofs + kzg_proofs*: fulu.KzgProofsV2 blobs*: deneb.Blobs RestPublishedSignedBlockContents* = object diff --git a/beacon_chain/spec/mev/fulu_mev.nim b/beacon_chain/spec/mev/fulu_mev.nim index 8dd55d822b..6a2f5f77cc 100644 --- a/beacon_chain/spec/mev/fulu_mev.nim +++ b/beacon_chain/spec/mev/fulu_mev.nim @@ -13,6 +13,7 @@ from stew/byteutils import to0xHex from ".."/datatypes/phase0 import AttesterSlashing from ".."/datatypes/capella import SignedBLSToExecutionChange from ".."/datatypes/deneb import BlobsBundle, KzgCommitments +from ".."/datatypes/fulu import BlobsBundleV2 from ".."/datatypes/electra import Attestation, AttesterSlashing, ExecutionRequests from ".."/eth2_merkleization import hash_tree_root diff --git a/beacon_chain/spec/network.nim b/beacon_chain/spec/network.nim index 765aaffc13..fb14d18ed4 100644 --- a/beacon_chain/spec/network.nim +++ b/beacon_chain/spec/network.nim @@ -170,7 +170,7 @@ func getDiscoveryForkID*(cfg: RuntimeConfig, ENRForkID( fork_digest: fork_digest, next_fork_version: current_fork_version, - next_fork_epoch: FAR_FUTURE_EPOCH) + next_fork_epoch: cfg.nextForkEpochAtEpoch(epoch)) # https://github.com/ethereum/consensus-specs/blob/v1.4.0/specs/altair/p2p-interface.md#transitioning-the-gossip type GossipState* = HashSet[Epoch] diff --git a/beacon_chain/spec/peerdas_helpers.nim b/beacon_chain/spec/peerdas_helpers.nim index 118466615e..33faf47a39 100644 --- a/beacon_chain/spec/peerdas_helpers.nim +++ b/beacon_chain/spec/peerdas_helpers.nim @@ -10,19 +10,23 @@ # Uncategorized helper functions from the spec import std/[algorithm, sequtils], + chronicles, results, eth/p2p/discoveryv5/[node], kzg4844/[kzg], ssz_serialization/[ proofs, types], + stew/assign2, ./crypto, ./[helpers, digest], ./datatypes/[fulu] +from stew/staticfor import staticfor + type - CellBytes = array[fulu.CELLS_PER_EXT_BLOB, Cell] - ProofBytes = array[fulu.CELLS_PER_EXT_BLOB, KzgProof] + CellBytes* = array[fulu.CELLS_PER_EXT_BLOB, Cell] + ProofBytes* = array[fulu.CELLS_PER_EXT_BLOB, KzgProof] # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/fulu/das-core.md#compute_columns_for_custody_group iterator compute_columns_for_custody_group*(custody_group: CustodyIndex): @@ -180,10 +184,59 @@ proc recover_matrix*(partial_matrix: seq[MatrixEntry], ok(extended_matrix) +proc recover_cells_and_proofs*( + data_columns: seq[ref DataColumnSidecar]): + Result[seq[CellsAndProofs], cstring] = + ## This helper recovers blobs from the data column sidecars + if not (data_columns.len != 0): + return err("DataColumnSidecar: Length should not be 0") + + let start = Moment.now() + + let + columnCount = data_columns.len + blobCount = data_columns[0].column.len + + for data_column in data_columns: + if not (blobCount == data_column.column.len): + return err ("DataColumns do not have the same length") + + var + recovered_cps = + newSeq[CellsAndProofs](blobCount) + + for blobIdx in 0..= blobs.len: - return err("BlobSidecar: response too short") + break let blob_sidecar = blobs[blob_cursor] if blob_sidecar.index != BlobIndex blob_idx: return err("BlobSidecar: unexpected index") @@ -286,6 +287,8 @@ func groupBlobs*( grouped[block_idx].add(blob_sidecar) inc blob_cursor + # TODO verify the at least first blob-carrying-block condition + if blob_cursor != len(blobs): # we reached end of blocks without consuming all blobs so either # the peer we got too few blocks in the paired request, or the diff --git a/beacon_chain/sync/validator_custody.nim b/beacon_chain/sync/validator_custody.nim new file mode 100644 index 0000000000..3a9eb59638 --- /dev/null +++ b/beacon_chain/sync/validator_custody.nim @@ -0,0 +1,236 @@ +# beacon_chain +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import std/sets +import chronos, chronicles +import ssz_serialization/[proofs, types] +import + ../validators/action_tracker, + ../spec/[beaconstate, forks, network, helpers, peerdas_helpers], + ../networking/eth2_network, + ../consensus_object_pools/blockchain_dag, + ../consensus_object_pools/block_dag, + ../consensus_object_pools/blob_quarantine, + "."/[request_manager, sync_manager, sync_protocol] + +from std/algorithm import sort +from std/sequtils import toSeq +from ../beacon_clock import GetBeaconTimeFn + +logScope: topics = "validator_custody" + +const + PARALLEL_REFILL_REQUESTS = 32 + VALIDATOR_CUSTODY_POLL_INTERVAL = 384.seconds + +type + ValidatorCustody* = object + network*: Eth2Node + dag*: ChainDAGRef + supernode*: bool + getLocalHeadSlot*: GetSlotCallback + older_column_set: HashSet[ColumnIndex] + newer_column_set*: HashSet[ColumnIndex] + diff_set*: seq[ColumnIndex] + global_refill_list: HashSet[DataColumnIdentifier] + requested_columns: seq[DataColumnsByRootIdentifier] + getBeaconTime: GetBeaconTimeFn + dataColumnQuarantine: ref ColumnQuarantine + validatorCustodyLoopFuture: Future[void].Raising([CancelledError]) + + ValidatorCustodyRef* = ref ValidatorCustody + +proc init*(T: type ValidatorCustodyRef, network: Eth2Node, + dag: ChainDAGRef, + supernode: bool, + getLocalHeadSlotCb: GetSlotCallback, + older_column_set: HashSet[ColumnIndex], + getBeaconTime: GetBeaconTimeFn, + dataColumnQuarantine: ref ColumnQuarantine): ValidatorCustodyRef = + (ValidatorCustodyRef)( + network: network, + dag: dag, + supernode: supernode, + getLocalHeadSlot: getLocalHeadSlotCb, + older_column_set: older_column_set, + getBeaconTime: getBeaconTime, + dataColumnQuarantine: dataColumnQuarantine) + +proc detectNewValidatorCustody*(vcus: ValidatorCustodyRef, + total_node_balance: Gwei): seq[ColumnIndex] = + var + diff_set: HashSet[ColumnIndex] + debug "Total node balance before applying validator custody", + total_node_balance = total_node_balance + let + vcustody = + vcus.dag.cfg.get_validators_custody_requirement(total_node_balance) + newer_columns = + vcus.dag.cfg.resolve_columns_from_custody_groups( + vcus.network.nodeId, + max(vcus.dag.cfg.SAMPLES_PER_SLOT.uint64, + vcustody)) + + debug "New validator custody count detected", + new_vcus_columns = newer_columns + # update data column quarantine custody requirements + vcus.dataColumnQuarantine[].custodyColumns = + newer_columns.toSeq() + sort(vcus.dataColumnQuarantine[].custodyColumns) + # check which custody set is larger + if newer_columns.len > vcus.older_column_set.len: + diff_set = newer_columns.difference(vcus.older_column_set) + vcus.diff_set = toSeq(diff_set) + vcus.newer_column_set = newer_columns + vcus.diff_set + +proc makeRefillList(vcus: ValidatorCustodyRef, diff: seq[ColumnIndex]) = + if vcus.global_refill_list.len > 0: + # There's already a batch of column refilling going on + # hence, no need to re-create this list. + discard + else: + let + slot = vcus.getLocalHeadSlot() + # Make earliest refilled slot go upto head because everythingprefer + # behind is currently undergoing excess column refilling. + vcus.dag.erSlot = slot + let dataColumnRefillEpoch = (slot.epoch - + vcus.dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS - 1) + var numberOfColumnEpochs = vcus.dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS.int + if slot.is_epoch() and dataColumnRefillEpoch >= vcus.dag.cfg.FULU_FORK_EPOCH: + var blocks = newSeq[BlockId](numberOfColumnEpochs) + let startIndex = vcus.dag.getBlockRange( + dataColumnRefillEpoch.start_slot, blocks.toOpenArray(0, numberOfColumnEpochs - 1)) + for i in startIndex.. 0: + debug "Requesting data columns by root for refill", peer = peer, + columns = shortLog(colIdList), peer_score = peer.getScore() + let columns = + await dataColumnSidecarsByRoot(peer, DataColumnsByRootIdentifierList colIdList) + if columns.isOk: + var ucolumns = columns.get().asSeq() + ucolumns.sort(cmpSidecarIndexes) + let records = checkColumnResponse(colIdList, ucolumns).valueOr: + debug "Response to columns by root is not a subset", + peer = peer, columns = shortLog(colIdList), ucolumns = len(ucolumns) + peer.updateScore(PeerScoreBadResponse) + return + for col in records: + let + block_root = + hash_tree_root(col.block_root) + exclude = + DataColumnIdentifier(block_root: block_root, + index: col.sidecar.index) + vcus.global_refill_list.excl(exclude) + # write new columns to database, no need of BlockVerifier + # in this scenario as the columns historically did pass DA, + # and did meet the historical custody requirements. + vcus.dag.db.putDataColumnSidecar(col.sidecar[]) + # Update earliest available slot as we scan through + # the received array of DataColumnSidecars + vcus.dag.erSlot = + col.sidecar[].signed_block_header.message.slot + else: + debug "Data columns by root request not done, peer doesn't have custody column", + peer = peer, columns = shortLog(colIdList), err = columns.error() + peer.updateScore(PeerScoreNoValues) + finally: + if not(isNil(peer)): + vcus.network.peerPool.release(peer) + +proc validatorCustodyColumnLoop( + vcus: ValidatorCustodyRef) {.async: (raises: [CancelledError]).} = + while true: + await sleepAsync(VALIDATOR_CUSTODY_POLL_INTERVAL) + if vcus.diff_set.len != 0: + info "Initiating validator custody columm backfill jobs" + vcus.makeRefillList(vcus.diff_set) + if vcus.global_refill_list.len != 0: + debug "Requesting detected missing data columns for refill", + columns = shortLog(vcus.requested_columns) + let start = SyncMoment.now(0) + var workers: + array[PARALLEL_REFILL_REQUESTS, Future[void].Raising([CancelledError])] + for i in 0..= ConsensusFork.Deneb: + consensusBlockValue: val.rewards.blockConsensusValue(), + blobsBundle: ( + when typeof(payload).kind in [ConsensusFork.Deneb, ConsensusFork.Electra]: payload.blobsBundle else: default(deneb.BlobsBundle) + ), + blobsBundleV2: ( + when typeof(payload).kind >= ConsensusFork.Fulu: + payload.blobsBundle + else: + default(fulu.BlobsBundleV2) + ) )) else: err(res.error) @@ -920,14 +930,45 @@ proc getBuilderBid[ proc proposeBlockMEV( node: BeaconNode, payloadBuilderClient: RestClientRef, blindedBlock: - electra_mev.SignedBlindedBeaconBlock | + electra_mev.SignedBlindedBeaconBlock): + Future[Result[Opt[BlockRef], string]] {.async: (raises: [CancelledError]).} = + let unblindedBlockRef = await node.unblindAndRouteBlockMEV( + payloadBuilderClient, blindedBlock) + if unblindedBlockRef.isOk and unblindedBlockRef.get.isSome: + beacon_blocks_proposed.inc() + return ok(unblindedBlockRef.get) + else: + # unblindedBlockRef.isOk and unblindedBlockRef.get.isNone indicates that + # the block failed to validate and integrate into the DAG, which for the + # purpose of this return value, is equivalent. It's used to drive Beacon + # REST API output. + # + # https://collective.flashbots.net/t/post-mortem-april-3rd-2023-mev-boost-relay-incident-and-related-timing-issue/1540 + # has caused false positives, because + # "A potential mitigation to this attack is to introduce a cutoff timing + # into the proposer's slot whereafter this time (e.g. 3 seconds) the relay + # will no longer return a block to the proposer. Relays began to roll out + # this mitigation in the evening of April 3rd UTC time with a 2 second + # cutoff, and notified other relays to do the same. After receiving + # credible reports of honest validators missing their slots the suggested + # timing cutoff was increased to 3 seconds." + let errMsg = + if unblindedBlockRef.isErr: + unblindedBlockRef.error + else: + "Unblinded block not returned to proposer" + err errMsg + +proc proposeBlockMEV( + node: BeaconNode, payloadBuilderClient: RestClientRef, + blindedBlock: fulu_mev.SignedBlindedBeaconBlock): - Future[Result[BlockRef, string]] {.async: (raises: [CancelledError]).} = + Future[Result[Opt[BlockRef], string]] {.async: (raises: [CancelledError]).} = let unblindedBlockRef = await node.unblindAndRouteBlockMEV( payloadBuilderClient, blindedBlock) - return if unblindedBlockRef.isOk and unblindedBlockRef.get.isSome: + if unblindedBlockRef.isOk: beacon_blocks_proposed.inc() - ok(unblindedBlockRef.get.get) + return ok(Opt.none(BlockRef)) else: # unblindedBlockRef.isOk and unblindedBlockRef.get.isNone indicates that # the block failed to validate and integrate into the DAG, which for the @@ -1090,7 +1131,7 @@ proc proposeBlockAux( head: BlockRef, slot: Slot, randao: ValidatorSig, fork: Fork, genesis_validators_root: Eth2Digest, localBlockValueBoost: uint8 -): Future[BlockRef] {.async: (raises: [CancelledError]).} = +): Future[Opt[BlockRef]] {.async: (raises: [CancelledError]).} = let boostFactor = BoostFactor.init(localBlockValueBoost) graffitiBytes = node.getGraffitiBytes(validator) @@ -1109,7 +1150,7 @@ proc proposeBlockAux( collectedBids.engineBid.value().executionPayloadValue) else: if not collectedBids.engineBid.isSome(): - return head # errors logged in router + return Opt.some(head) # errors logged in router false # There should always be an engine bid, and if payloadBuilderClient exists, @@ -1150,20 +1191,25 @@ proc proposeBlockAux( blindedBlock = (await blindedBlockCheckSlashingAndSign( node, slot, validator, validator_index, collectedBids.builderBid.value().blindedBlckPart)).valueOr: - return head + return Opt.some(head) # Before proposeBlockMEV, can fall back to EL; after, cannot without # risking slashing. maybeUnblindedBlock = await proposeBlockMEV( node, payloadBuilderClient, blindedBlock) - return maybeUnblindedBlock.valueOr: + if maybeUnblindedBlock.isOk(): + if maybeUnblindedBlock.get.isSome(): + return maybeUnblindedBlock.get + else: + return Opt.none(BlockRef) + else: warn "Blinded block proposal incomplete", head = shortLog(head), slot, validator_index, validator = shortLog(validator), err = maybeUnblindedBlock.error, blindedBlck = shortLog(blindedBlock) beacon_block_builder_missed_without_fallback.inc() - return head + return Opt.some(head) let engineBid = collectedBids.engineBid.value() @@ -1184,7 +1230,7 @@ proc proposeBlockAux( validator = validator.pubkey, slot = slot, existingProposal = notSlashable.error - return head + return Opt.some(head) let signature = @@ -1194,24 +1240,32 @@ proc proposeBlockAux( if res.isErr(): warn "Unable to sign block", validator = shortLog(validator), error_msg = res.error() - return head + return Opt.some(head) res.get() signedBlock = consensusFork.SignedBeaconBlock( message: forkyBlck, signature: signature, root: blockRoot) blobsOpt = - when consensusFork >= ConsensusFork.Deneb: + when consensusFork in [ConsensusFork.Deneb, ConsensusFork.Electra]: Opt.some(signedBlock.create_blob_sidecars( - engineBid.blobsBundle.proofs, engineBid.blobsBundle.blobs)) + KzgProofs(engineBid.blobsBundle.proofs), engineBid.blobsBundle.blobs)) else: Opt.none(seq[BlobSidecar]) + columnsOpt = + when consensusFork >= ConsensusFork.Fulu: + Opt.some(signedBlock.assemble_data_column_sidecars( + engineBid.blobsBundleV2.blobs.mapIt(kzg.KzgBlob(bytes: it)), + @(engineBid.blobsBundleV2.proofs.mapIt(kzg.KzgProof(it))))) + else: + Opt.none(seq[DataColumnSidecar]) + newBlockRef = ( await node.router.routeSignedBeaconBlock(signedBlock, blobsOpt, - checkValidator = false) + columnsOpt, checkValidator = false) ).valueOr: - return head # Errors logged in router + return Opt.some(head) # Errors logged in router if newBlockRef.isNone(): - return head # Validation errors logged in router + return Opt.some(head) # Validation errors logged in router notice "Block proposed", blockRoot = shortLog(blockRoot), blck = shortLog(forkyBlck), @@ -1219,7 +1273,7 @@ proc proposeBlockAux( beacon_blocks_proposed.inc() - return newBlockRef.get() + return newBlockRef proc proposeBlock( node: BeaconNode, @@ -1227,7 +1281,7 @@ proc proposeBlock( validator_index: ValidatorIndex, head: BlockRef, slot: Slot -): Future[BlockRef] {.async: (raises: [CancelledError]).} = +): Future[Opt[BlockRef]] {.async: (raises: [CancelledError]).} = if head.slot >= slot: # We should normally not have a head newer than the slot we're proposing for # but this can happen if block proposal is delayed @@ -1235,7 +1289,7 @@ proc proposeBlock( headSlot = shortLog(head.slot), headBlockRoot = shortLog(head.root), slot = shortLog(slot) - return head + return Opt.some(head) let fork = node.dag.forkAtEpoch(slot.epoch) @@ -1246,7 +1300,7 @@ proc proposeBlock( if res.isErr(): warn "Unable to generate randao reveal", validator = shortLog(validator), error_msg = res.error() - return head + return Opt.some(head) res.get() template proposeBlockContinuation(type1, type2: untyped): auto = @@ -1472,21 +1526,21 @@ proc sendSyncCommitteeContributions( node, validator, subcommitteeIdx, head, slot) proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot): - Future[BlockRef] {.async: (raises: [CancelledError]).} = + Future[Opt[BlockRef]] {.async: (raises: [CancelledError]).} = ## Perform the proposal for the given slot, iff we have a validator attached ## that is supposed to do so, given the shuffling at that slot for the given ## head - to compute the proposer, we need to advance a state to the given ## slot let proposer = node.dag.getProposer(head, slot).valueOr: - return head + return Opt.some(head) proposerKey = node.dag.validatorKey(proposer).get().toPubKey validator = node.getValidatorForDuties(proposer, slot).valueOr: debug "Expecting block proposal", headRoot = shortLog(head.root), slot = shortLog(slot), proposer_index = proposer, proposer = shortLog(proposerKey) - return head + return Opt.some(head) return await proposeBlock(node, validator, proposer, head, slot) @@ -1871,7 +1925,8 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async: (ra node.updateValidators(forkyState.data.validators.asSeq()) let newHead = await handleProposal(node, head, slot) - head = newHead + if newHead.isSome(): + head = newHead.get let # The latest point in time when we'll be sending out attestations @@ -2007,7 +2062,19 @@ proc makeMaybeBlindedBeaconBlockForHeadAndSlotImpl[ResultType]( doAssert engineBid.blck.kind == consensusFork template forkyBlck: untyped = engineBid.blck.forky(consensusFork) - when consensusFork >= ConsensusFork.Deneb: + when consensusFork >= ConsensusFork.Fulu: + doAssert engineBid.blobsBundleV2.commitments == + forkyBlck.body.blob_kzg_commitments + ResultType.ok(( + blck: consensusFork.MaybeBlindedBeaconBlock( + isBlinded: false, + data: consensusFork.BlockContents( + `block`: forkyBlck, + kzg_proofs: KzgProofsV2(engineBid.blobsBundleV2.proofs), + blobs: engineBid.blobsBundleV2.blobs)), + executionValue: Opt.some(engineBid.executionPayloadValue), + consensusValue: Opt.some(engineBid.consensusBlockValue))) + elif consensusFork >= ConsensusFork.Deneb: doAssert engineBid.blobsBundle.commitments == forkyBlck.body.blob_kzg_commitments ResultType.ok(( @@ -2015,7 +2082,7 @@ proc makeMaybeBlindedBeaconBlockForHeadAndSlotImpl[ResultType]( isBlinded: false, data: consensusFork.BlockContents( `block`: forkyBlck, - kzg_proofs: engineBid.blobsBundle.proofs, + kzg_proofs: KzgProofs(engineBid.blobsBundle.proofs), blobs: engineBid.blobsBundle.blobs)), executionValue: Opt.some(engineBid.executionPayloadValue), consensusValue: Opt.some(engineBid.consensusBlockValue))) diff --git a/beacon_chain/validators/message_router.nim b/beacon_chain/validators/message_router.nim index 31c9fca380..04f4c8b042 100644 --- a/beacon_chain/validators/message_router.nim +++ b/beacon_chain/validators/message_router.nim @@ -12,11 +12,13 @@ import chronicles, metrics, ../spec/network, + ../spec/peerdas_helpers, ../consensus_object_pools/spec_cache, ../gossip_processing/eth2_processor, ../networking/eth2_network, ./activity_metrics, - ../spec/datatypes/deneb + ../spec/datatypes/[deneb, fulu] + from ../spec/state_transition_block import validate_blobs export eth2_processor, eth2_network @@ -84,8 +86,8 @@ template getCurrentBeaconTime(router: MessageRouter): BeaconTime = type RouteBlockResult = Result[Opt[BlockRef], string] proc routeSignedBeaconBlock*( router: ref MessageRouter, blck: ForkySignedBeaconBlock, - blobsOpt: Opt[seq[BlobSidecar]], checkValidator: bool): - Future[RouteBlockResult] {.async: (raises: [CancelledError]).} = + blobsOpt: Opt[seq[BlobSidecar]], dataColumnsOpt: Opt[seq[DataColumnSidecar]], + checkValidator: bool): Future[RouteBlockResult] {.async: (raises: [CancelledError]).} = ## Validate and broadcast beacon block, then add it to the block database ## Returns the new Head when block is added successfully to dag, none when ## block passes validation but is not added, and error otherwise @@ -112,7 +114,7 @@ proc routeSignedBeaconBlock*( signature = shortLog(blck.signature), error = res.error() return err($(res.error()[1])) - when typeof(blck).kind >= ConsensusFork.Deneb: + when typeof(blck).kind in [ConsensusFork.Deneb, ConsensusFork.Electra]: if blobsOpt.isSome: let blobs = blobsOpt.get() let kzgCommits = blck.message.body.blob_kzg_commitments.asSeq @@ -153,27 +155,68 @@ proc routeSignedBeaconBlock*( signature = shortLog(blck.signature), error = res.error() var blobRefs = Opt.none(BlobSidecars) - if blobsOpt.isSome(): - let blobs = blobsOpt.get() - var workers = newSeq[Future[SendResult]](blobs.len) - for i in 0..= ConsensusFork.Fulu: + let dataColumns = dataColumnsOpt.get() + if dataColumnsOpt.isSome() and dataColumns.len != 0: + var das_workers = + newSeq[Future[SendResult]](len(dataColumns)) + for i in 0..= ConsensusFork.Deneb: + when consensusFork in [ConsensusFork.Deneb, ConsensusFork.Electra]: let blobs = signedBlock.create_blob_sidecars( payload.blobsBundle.proofs, payload.blobsBundle.blobs) for blob in blobs: diff --git a/tests/test_block_processor.nim b/tests/test_block_processor.nim index 553491b11e..2bd1931475 100644 --- a/tests/test_block_processor.nim +++ b/tests/test_block_processor.nim @@ -14,8 +14,9 @@ import unittest2, taskpools, ../beacon_chain/conf, - ../beacon_chain/spec/[beaconstate, forks, helpers, state_transition], - ../beacon_chain/spec/datatypes/deneb, + ../beacon_chain/spec/[ + beaconstate, forks, helpers, state_transition], + ../beacon_chain/spec/datatypes/[deneb, fulu], ../beacon_chain/gossip_processing/block_processor, ../beacon_chain/consensus_object_pools/[ attestation_pool, blockchain_dag, blob_quarantine, block_quarantine, @@ -49,6 +50,7 @@ suite "Block processor" & preset(): taskpool = Taskpool.new() quarantine = newClone(Quarantine.init(cfg)) blobQuarantine = newClone(BlobQuarantine()) + dataColumnQuarantine = newClone(ColumnQuarantine()) attestationPool = newClone(AttestationPool.init(dag, quarantine)) elManager = new ELManager # TODO: initialise this properly actionTracker: ActionTracker @@ -68,14 +70,15 @@ suite "Block processor" & preset(): batchVerifier = BatchVerifier.new(rng, taskpool) processor = BlockProcessor.new( false, "", "", batchVerifier, consensusManager, - validatorMonitor, blobQuarantine, getTimeFn) + validatorMonitor, blobQuarantine, dataColumnQuarantine, + getTimeFn) discard processor.runQueueProcessingLoop() asyncTest "Reverse order block add & get" & preset(): let missing = await processor[].addBlock( MsgSource.gossip, ForkedSignedBeaconBlock.init(b2), - Opt.none(BlobSidecars)) + Opt.none(BlobSidecars), Opt.none(DataColumnSidecars)) check: missing.error == VerifierError.MissingParent @@ -87,7 +90,7 @@ suite "Block processor" & preset(): let status = await processor[].addBlock( MsgSource.gossip, ForkedSignedBeaconBlock.init(b1), - Opt.none(BlobSidecars)) + Opt.none(BlobSidecars), Opt.none(DataColumnSidecars)) b1Get = dag.getBlockRef(b1.root) check: @@ -133,15 +136,15 @@ suite "Block processor" & preset(): let processor = BlockProcessor.new( false, "", "", batchVerifier, consensusManager, - validatorMonitor, blobQuarantine, getTimeFn, - invalidBlockRoots = @[b2.root]) + validatorMonitor, blobQuarantine, dataColumnQuarantine, + getTimeFn, invalidBlockRoots = @[b2.root]) processorFut = processor.runQueueProcessingLoop() defer: await processorFut.cancelAndWait() block: let res = await processor[].addBlock( MsgSource.gossip, ForkedSignedBeaconBlock.init(b2), - Opt.none(BlobSidecars)) + Opt.none(BlobSidecars), Opt.none(DataColumnSidecars)) check: res.isErr not dag.containsForkBlock(b1.root) @@ -150,7 +153,7 @@ suite "Block processor" & preset(): block: let res = await processor[].addBlock( MsgSource.gossip, ForkedSignedBeaconBlock.init(b1), - Opt.none(BlobSidecars)) + Opt.none(BlobSidecars), Opt.none(DataColumnSidecars)) check: res.isOk dag.containsForkBlock(b1.root) @@ -164,7 +167,7 @@ suite "Block processor" & preset(): block: let res = await processor[].addBlock( MsgSource.gossip, ForkedSignedBeaconBlock.init(b2), - Opt.none(BlobSidecars)) + Opt.none(BlobSidecars), Opt.none(DataColumnSidecars)) check: res == Result[void, VerifierError].err VerifierError.Invalid dag.containsForkBlock(b1.root) diff --git a/tests/test_discovery.nim b/tests/test_discovery.nim index 1e9333c28a..59be139539 100644 --- a/tests/test_discovery.nim +++ b/tests/test_discovery.nim @@ -235,7 +235,7 @@ suite "Discovery fork ID": forkId = ENRForkID( fork_digest: fork_digest, next_fork_version: next_fork_version, - next_fork_epoch: FAR_FUTURE_EPOCH) + next_fork_epoch: cfg.ALTAIR_FORK_EPOCH) for epoch in GENESIS_EPOCH ..< cfg.ALTAIR_FORK_EPOCH - 1: check cfg.getDiscoveryForkID(epoch, genesis_validators_root) == forkId forkId @@ -265,7 +265,7 @@ suite "Discovery fork ID": forkId = ENRForkID( fork_digest: fork_digest, next_fork_version: next_fork_version, - next_fork_epoch: FAR_FUTURE_EPOCH) + next_fork_epoch: cfg.BELLATRIX_FORK_EPOCH) for epoch in cfg.ALTAIR_FORK_EPOCH ..< cfg.BELLATRIX_FORK_EPOCH - 1: check cfg.getDiscoveryForkID(epoch, genesis_validators_root) == forkId forkId