diff --git a/portal/network/history/content/content_keys.nim b/portal/network/history/content/content_keys.nim new file mode 100644 index 0000000000..606d010fad --- /dev/null +++ b/portal/network/history/content/content_keys.nim @@ -0,0 +1,110 @@ +# Nimbus +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import results, stint, ssz_serialization, ../../../common/common_types + +export ssz_serialization, common_types, results + +type + ContentType* = enum + blockBody = 0x00 + receipts = 0x01 + + BlockNumberKey* = object + blockNumber*: uint64 + + ContentKey* = object + case contentType*: ContentType + of blockBody: + blockBodyKey*: BlockNumberKey + of receipts: + receiptsKey*: BlockNumberKey + +func blockBodyContentKey*(blockNumber: uint64): ContentKey = + ContentKey( + contentType: blockBody, blockBodyKey: BlockNumberKey(blockNumber: blockNumber) + ) + +func receiptsContentKey*(blockNumber: uint64): ContentKey = + ContentKey( + contentType: receipts, receiptsKey: BlockNumberKey(blockNumber: blockNumber) + ) + +template blockNumber*(contentKey: ContentKey): uint64 = + ## Returns the block number for the given content key + case contentKey.contentType + of blockBody: contentKey.blockBodyKey.blockNumber + of receipts: contentKey.receiptsKey.blockNumber + +proc readSszBytes*(data: openArray[byte], val: var ContentKey) {.raises: [SszError].} = + mixin readSszValue + + readSszValue(data, val) + +func encode*(contentKey: ContentKey): ContentKeyByteList = + ContentKeyByteList.init(SSZ.encode(contentKey)) + +func decode*(contentKey: ContentKeyByteList): Opt[ContentKey] = + try: + Opt.some(SSZ.decode(contentKey.asSeq(), ContentKey)) + except SerializationError: + return Opt.none(ContentKey) + +func reverseBits(n: uint64, width: int): uint64 = + ## Reverse the lowest `width` bits of `n` + # TODO: can improve + var res: uint64 = 0 + for i in 0 ..< width: + if ((n shr i) and 1) != 0: + res = res or (1'u64 shl (width - 1 - i)) + res + +const + CYCLE_BITS = 16 + OFFSET_BITS = 256 - CYCLE_BITS # 240 + REVERSED_OFFSET_BITS = 64 - CYCLE_BITS # 48 + +func toContentId*(blockNumber: uint64, contentType: ContentType): UInt256 = + ## Returns the content id for a given block number + let + cycleBits = blockNumber mod (1'u64 shl CYCLE_BITS) + offsetBits = blockNumber div (1'u64 shl CYCLE_BITS) + + reversedOffsetBits = reverseBits(offsetBits, REVERSED_OFFSET_BITS) + + (cycleBits.stuint(256) shl OFFSET_BITS) or + (reversedOffsetBits.stuint(256) shl (OFFSET_BITS - REVERSED_OFFSET_BITS)) or + ord(contentType).stuint(256) + +func toContentId*(contentKey: ContentKey): ContentId = + case contentKey.contentType + of blockBody: + toContentId(contentKey.blockBodyKey.blockNumber, contentKey.contentType) + of receipts: + toContentId(contentKey.receiptsKey.blockNumber, contentKey.contentType) + +func toContentId*(bytes: ContentKeyByteList): Opt[ContentId] = + let contentKey = ?bytes.decode() + Opt.some(contentKey.toContentId()) + +func `$`*(x: BlockNumberKey): string = + "block_number: " & $x.blockNumber + +func `$`*(x: ContentKey): string = + var res = "(type: " & $x.contentType & ", " + + case x.contentType + of blockBody: + res.add($x.blockBodyKey) + of receipts: + res.add($x.receiptsKey) + + res.add(")") + + res diff --git a/portal/network/history/content/content_values.nim b/portal/network/history/content/content_values.nim new file mode 100644 index 0000000000..0951b45c4e --- /dev/null +++ b/portal/network/history/content/content_values.nim @@ -0,0 +1,16 @@ +# Nimbus +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import eth/common/blocks_rlp, eth/common/receipts_rlp + +export blocks_rlp, receipts_rlp + +type + StoredReceipts* = seq[StoredReceipt] + ContentValueType* = BlockBody | StoredReceipts diff --git a/portal/network/history/history_content.nim b/portal/network/history/history_content.nim new file mode 100644 index 0000000000..461102cca8 --- /dev/null +++ b/portal/network/history/history_content.nim @@ -0,0 +1,12 @@ +# Nimbus +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import ./content/content_keys, ./content/content_values + +export content_keys, content_values diff --git a/portal/network/history/history_endpoints.nim b/portal/network/history/history_endpoints.nim new file mode 100644 index 0000000000..9bb05f06cd --- /dev/null +++ b/portal/network/history/history_endpoints.nim @@ -0,0 +1,22 @@ +# Nimbus +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import results, chronicles, chronos, ./history_network + +export results, history_network + +proc getBlockBody*( + n: HistoryNetwork, header: Header +): Future[Opt[BlockBody]] {.async: (raises: [CancelledError], raw: true).} = + n.getContent(blockBodyContentKey(header.number), BlockBody, header) + +proc getReceipts*( + n: HistoryNetwork, header: Header +): Future[Opt[StoredReceipts]] {.async: (raises: [CancelledError], raw: true).} = + n.getContent(receiptsContentKey(header.number), StoredReceipts, header) diff --git a/portal/network/history/history_network.nim b/portal/network/history/history_network.nim new file mode 100644 index 0000000000..c24f8c835a --- /dev/null +++ b/portal/network/history/history_network.nim @@ -0,0 +1,270 @@ +# Nimbus +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import + results, + chronos, + chronicles, + metrics, + eth/common/headers, + eth/p2p/discoveryv5/[protocol, enr], + ../../common/common_types, + ../../database/content_db, + # ../network_metadata, + ../wire/[portal_protocol, portal_stream, portal_protocol_config, ping_extensions], + "."/[history_content, history_validation] + +from eth/common/accounts import EMPTY_ROOT_HASH + +export history_content, headers + +logScope: + topics = "portal_fin_hist" + +const pingExtensionCapabilities = {CapabilitiesType, HistoryRadiusType} + +type + GetHeaderCallback* = proc(blockNumber: uint64): Future[Result[Header, string]] {. + async: (raises: [CancelledError]), gcsafe + .} + + HistoryNetwork* = ref object + portalProtocol*: PortalProtocol + contentDB*: ContentDB + contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])] + getHeader*: GetHeaderCallback + # cfg*: RuntimeConfig + processContentLoops: seq[Future[void]] + statusLogLoop: Future[void] + contentRequestRetries: int + contentQueueWorkers: int + +proc defaultNoGetHeader*( + blockNumber: uint64 +): Future[Result[Header, string]] {.async: (raises: [CancelledError]), gcsafe.} = + err( + "No getHeader callback set for PortalNode, cannot verify offered content for block: " & + $blockNumber + ) + +func toContentIdHandler(contentKey: ContentKeyByteList): results.Opt[ContentId] = + toContentId(contentKey) + +proc new*( + T: type HistoryNetwork, + portalNetwork: PortalNetwork, + baseProtocol: protocol.Protocol, + contentDB: ContentDB, + streamManager: StreamManager, + getHeaderCallback: GetHeaderCallback = defaultNoGetHeader, + # cfg: RuntimeConfig, + bootstrapRecords: openArray[Record] = [], + portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig, + contentRequestRetries = 1, + contentQueueWorkers = 50, + contentQueueSize = 50, +): T = + let + contentQueue = + newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](contentQueueSize) + + stream = streamManager.registerNewStream(contentQueue) + + portalProtocol = PortalProtocol.new( + baseProtocol, + getProtocolId(portalNetwork, PortalSubnetwork.history), + toContentIdHandler, + createGetHandler(contentDB), + createStoreHandler(contentDB, portalConfig.radiusConfig), + createContainsHandler(contentDB), + createRadiusHandler(contentDB), + stream, + bootstrapRecords, + config = portalConfig, + pingExtensionCapabilities = pingExtensionCapabilities, + ) + + HistoryNetwork( + portalProtocol: portalProtocol, + contentDB: contentDB, + contentQueue: contentQueue, + getHeader: getHeaderCallback, + # cfg: cfg, + contentRequestRetries: contentRequestRetries, + contentQueueWorkers: contentQueueWorkers, + ) + +func localNode*(n: HistoryNetwork): Node = + n.portalProtocol.localNode() + +proc getContent*( + n: HistoryNetwork, contentKey: ContentKey, V: type ContentValueType, header: Header +): Future[Opt[V]] {.async: (raises: [CancelledError]).} = + ## Get the decoded content for the given content key. + ## + ## When the content is found locally, no validation is performed. + ## Else, the content is fetched from the network and validated with the provided header. + ## If content validation fails, the node is banned for a certain period of time and + ## content request is retried `contentRequestRetries` times. + let contentKeyBytes = encode(contentKey) + + logScope: + contentKeyBytes + + let contentId = contentKeyBytes.toContentId().valueOr: + warn "Received invalid content key", contentKeyBytes + return Opt.none(V) + + # Check first locally + n.portalProtocol.getLocalContent(contentKeyBytes, contentId).isErrOr: + let contentValue = decodeRlp(value(), V).valueOr: + raiseAssert("Unable to decode history local content value") + + debug "Fetched local content value" + return Opt.some(contentValue) + + for i in 0 ..< (1 + n.contentRequestRetries): + let + lookupRes = (await n.portalProtocol.contentLookup(contentKeyBytes, contentId)).valueOr: + warn "Failed fetching content from the network" + return Opt.none(V) + + contentValue = decodeRlp(lookupRes.content, V).valueOr: + warn "Unable to decode content value from content lookup" + continue + + validateContent(contentValue, header).isOkOr: + n.portalProtocol.banNode( + lookupRes.receivedFrom.id, NodeBanDurationContentLookupFailedValidation + ) + warn "Error validating retrieved content", error = error + continue + + debug "Fetched valid content from the network" + n.portalProtocol.storeContent( + contentKeyBytes, contentId, lookupRes.content, cacheContent = true + ) + + asyncSpawn n.portalProtocol.triggerPoke( + lookupRes.nodesInterestedInContent, contentKeyBytes, lookupRes.content + ) + + return Opt.some(contentValue) + + # Content was requested `1 + requestRetries` times and all failed on validation + Opt.none(V) + +proc validateContent( + n: HistoryNetwork, content: seq[byte], contentKeyBytes: ContentKeyByteList +): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = + # TODO: specs might turn out to just disable offers. Although I think for for getting initial data in the network + # this might be an issue. Unless history expiry gets deployed together with Portal. + let contentKey = history_content.decode(contentKeyBytes).valueOr: + return err("Error decoding content key") + + let header = ?(await n.getHeader(contentKey.blockNumber())) + + case contentKey.contentType + of blockBody: + let blockBody = decodeRlp(content, BlockBody).valueOr: + return err("Error decoding block body: " & error) + validateBlockBody(blockBody, header).isOkOr: + return err("Failed validating block body: " & error) + + ok() + of receipts: + let receipts = decodeRlp(content, StoredReceipts).valueOr: + return err("Error decoding receipts: " & error) + validateReceipts(receipts, header).isOkOr: + return err("Failed validating receipts: " & error) + + ok() + +proc validateContent( + n: HistoryNetwork, + srcNodeId: Opt[NodeId], + contentKeys: ContentKeysList, + contentItems: seq[seq[byte]], +): Future[bool] {.async: (raises: [CancelledError]).} = + # content passed here can have less items then contentKeys, but not more. + for i, contentItem in contentItems: + let contentKey = contentKeys[i] + let res = await n.validateContent(contentItem, contentKey) + if res.isOk(): + let contentId = n.portalProtocol.toContentId(contentKey).valueOr: + warn "Received offered content with invalid content key", srcNodeId, contentKey + return false + + n.portalProtocol.storeContent( + contentKey, contentId, contentItem, cacheOffer = true + ) + + debug "Received offered content validated successfully", srcNodeId, contentKey + else: + if srcNodeId.isSome(): + n.portalProtocol.banNode(srcNodeId.get(), NodeBanDurationOfferFailedValidation) + + debug "Received offered content failed validation", + srcNodeId, contentKey, error = res.error + return false + + return true + +proc contentQueueWorker(n: HistoryNetwork) {.async: (raises: []).} = + try: + while true: + let (srcNodeId, contentKeys, contentItems) = await n.contentQueue.popFirst() + + if await n.validateContent(srcNodeId, contentKeys, contentItems): + portal_offer_validation_successful.inc( + labelValues = [$n.portalProtocol.protocolId] + ) + + discard await n.portalProtocol.neighborhoodGossip( + srcNodeId, contentKeys, contentItems + ) + else: + portal_offer_validation_failed.inc(labelValues = [$n.portalProtocol.protocolId]) + except CancelledError: + trace "contentQueueWorker canceled" + +proc statusLogLoop(n: HistoryNetwork) {.async: (raises: []).} = + try: + while true: + await sleepAsync(60.seconds) + + info "History network status", + routingTableNodes = n.portalProtocol.routingTable.len() + except CancelledError: + trace "statusLogLoop canceled" + +proc start*(n: HistoryNetwork) = + info "Starting Portal chain history network", protocolId = n.portalProtocol.protocolId + + n.portalProtocol.start() + + for i in 0 ..< n.contentQueueWorkers: + n.processContentLoops.add(contentQueueWorker(n)) + + n.statusLogLoop = statusLogLoop(n) + +proc stop*(n: HistoryNetwork) {.async: (raises: []).} = + info "Stopping Portal chain history network" + + var futures: seq[Future[void]] + futures.add(n.portalProtocol.stop()) + + for loop in n.processContentLoops: + futures.add(loop.cancelAndWait()) + if not n.statusLogLoop.isNil: + futures.add(n.statusLogLoop.cancelAndWait()) + await noCancel(allFutures(futures)) + + n.processContentLoops.setLen(0) + n.statusLogLoop = nil diff --git a/portal/network/history/history_validation.nim b/portal/network/history/history_validation.nim new file mode 100644 index 0000000000..b2e397c509 --- /dev/null +++ b/portal/network/history/history_validation.nim @@ -0,0 +1,86 @@ +# Nimbus +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import + eth/trie/ordered_trie, + eth/common/[headers_rlp, blocks_rlp, receipts, hashes], + ./history_content + +export history_content + +func validateBlockBody*(body: BlockBody, header: Header): Result[void, string] = + ## Validate the block body against the txRoot, ommersHash and withdrawalsRoot + ## from the header. + ## TODO: could add block number vs empty ommersHash + existing withdrawalsRoot check + let calculatedOmmersHash = keccak256(rlp.encode(body.uncles)) + # TODO: avoid having to re-encode the uncles + if calculatedOmmersHash != header.ommersHash: + return err( + "Invalid ommers hash: expected " & $header.ommersHash & " - got " & + $calculatedOmmersHash + ) + + let calculatedTxsRoot = orderedTrieRoot(body.transactions) + if calculatedTxsRoot != header.txRoot: + return err( + "Invalid transactions root: expected " & $header.txRoot & " - got " & + $calculatedTxsRoot + ) + + if header.withdrawalsRoot.isSome() and body.withdrawals.isNone() or + header.withdrawalsRoot.isNone() and body.withdrawals.isSome(): + return err("Invalid withdrawals") + + if header.withdrawalsRoot.isSome() and body.withdrawals.isSome(): + let + calculatedWithdrawalsRoot = orderedTrieRoot(body.withdrawals.value()) + headerWithdrawalsRoot = header.withdrawalsRoot.get() + if calculatedWithdrawalsRoot != headerWithdrawalsRoot: + return err( + "Invalid withdrawals root: expected " & $headerWithdrawalsRoot & " - got " & + $calculatedWithdrawalsRoot + ) + + ok() + +func validateReceipts*( + storedReceipts: StoredReceipts, header: Header +): Result[void, string] = + let receipts = storedReceipts.to(seq[Receipt]) + + ## Validate the receipts against the receiptsRoot from the header. + let calculatedReceiptsRoot = orderedTrieRoot(receipts) + if calculatedReceiptsRoot != header.receiptsRoot: + err( + "Unexpected receipt root: expected " & $header.receiptsRoot & " - got " & + $calculatedReceiptsRoot + ) + else: + ok() + +func validateContent*( + content: BlockBody | StoredReceipts, header: Header +): Result[void, string] = + type T = type(content) + when T is BlockBody: + validateBlockBody(content, header) + elif T is StoredReceipts: + validateReceipts(content, header) + +func validateContent*( + key: ContentKey, contentBytes: seq[byte], header: Header +): Result[void, string] = + ## Validate the encoded content against the header. + case key.contentType + of blockBody: + let content = ?decodeRlp(contentBytes, BlockBody) + validateBlockBody(content, header) + of receipts: + let content = ?decodeRlp(contentBytes, StoredReceipts) + validateReceipts(content, header) diff --git a/portal/network/portal_node.nim b/portal/network/portal_node.nim index 3952ab6d9d..054ec6bd8a 100644 --- a/portal/network/portal_node.nim +++ b/portal/network/portal_node.nim @@ -17,6 +17,7 @@ import ../database/content_db, ./network_metadata, ./wire/[portal_stream, portal_protocol_config], + ./history/history_network, ./beacon/[beacon_init_loader, beacon_light_client], ./legacy_history/[history_network, history_content] @@ -39,6 +40,7 @@ type discovery: protocol.Protocol contentDB: ContentDB streamManager: StreamManager + historyNetwork*: Opt[HistoryNetwork] beaconNetwork*: Opt[BeaconNetwork] legacyHistoryNetwork*: Opt[LegacyHistoryNetwork] beaconLightClient*: Opt[LightClient] @@ -103,6 +105,24 @@ proc new*( # Get it from binary file containing SSZ encoded accumulator loadAccumulator() + historyNetwork = + if PortalSubnetwork.history in subnetworks: + Opt.some( + HistoryNetwork.new( + network, + discovery, + contentDB, + streamManager, + bootstrapRecords = bootstrapRecords, + portalConfig = config.portalConfig, + contentRequestRetries = config.contentRequestRetries, + contentQueueWorkers = config.contentQueueWorkers, + contentQueueSize = config.contentQueueSize, + ) + ) + else: + Opt.none(HistoryNetwork) + beaconNetwork = if PortalSubnetwork.beacon in subnetworks: let @@ -172,6 +192,7 @@ proc new*( discovery: discovery, contentDB: contentDB, streamManager: streamManager, + historyNetwork: historyNetwork, beaconNetwork: beaconNetwork, legacyHistoryNetwork: legacyHistoryNetwork, beaconLightClient: beaconLightClient, @@ -204,6 +225,8 @@ proc start*(n: PortalNode) = n.discovery.start() + if n.historyNetwork.isSome(): + n.historyNetwork.value.start() if n.beaconNetwork.isSome(): n.beaconNetwork.value.start() if n.legacyHistoryNetwork.isSome(): @@ -218,6 +241,8 @@ proc stop*(n: PortalNode) {.async: (raises: []).} = var futures: seq[Future[void]] + if n.historyNetwork.isSome(): + futures.add(n.historyNetwork.value.stop()) if n.beaconNetwork.isSome(): futures.add(n.beaconNetwork.value.stop()) if n.legacyHistoryNetwork.isSome(): diff --git a/portal/network/wire/portal_protocol.nim b/portal/network/wire/portal_protocol.nim index ffbb13148e..cee1d80759 100644 --- a/portal/network/wire/portal_protocol.nim +++ b/portal/network/wire/portal_protocol.nim @@ -308,10 +308,10 @@ func getProtocolId*( case network of PortalNetwork.none, PortalNetwork.mainnet: case subnetwork + of PortalSubnetwork.history: + [portalPrefix, 0x00] of PortalSubnetwork.legacyHistory: [portalPrefix, 0x0B] - of PortalSubnetwork.history: - raiseAssert "Not yet supported" of PortalSubnetwork.beacon: [portalPrefix, 0x0C] of PortalSubnetwork.transactionIndex: @@ -320,10 +320,10 @@ func getProtocolId*( [portalPrefix, 0x0F] of PortalNetwork.angelfood: case subnetwork + of PortalSubnetwork.history: + [portalPrefix, 0x40] of PortalSubnetwork.legacyHistory: [portalPrefix, 0x4B] - of PortalSubnetwork.history: - raiseAssert "Not yet supported" of PortalSubnetwork.beacon: [portalPrefix, 0x4C] of PortalSubnetwork.transactionIndex: diff --git a/portal/rpc/rpc_portal_history_api.nim b/portal/rpc/rpc_portal_history_api.nim new file mode 100644 index 0000000000..93a8c80a5c --- /dev/null +++ b/portal/rpc/rpc_portal_history_api.nim @@ -0,0 +1,148 @@ +# Nimbus +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import + json_rpc/rpcserver, + json_serialization/std/tables, + stew/byteutils, + ../common/common_types, + ../network/wire/portal_protocol, + ../network/history/history_content, + ../network/history/history_validation, + ./rpc_types + +export tables + +# Portal History Network JSON-RPC API +# Note: +# - This API is not part of the Portal Network specification yet. +# - Lower level API calls are not implemented as they are typically only used for (Hive) +# testing and it is not clear yet of this will be needed in the future. +# - Added the header parameter so that validation can happen on json-rpc server side, +# but it could also be moved to client side. +# - Could also make a less generic API + +ContentInfo.useDefaultSerializationIn JrpcConv +TraceContentLookupResult.useDefaultSerializationIn JrpcConv +TraceObject.useDefaultSerializationIn JrpcConv +NodeMetadata.useDefaultSerializationIn JrpcConv +TraceResponse.useDefaultSerializationIn JrpcConv + +# TODO: It would be cleaner to use the existing getContent/getBlockBody/getReceipts calls for +# less code duplication + automatic retries, but the specific error messages + extra content +# info would need to be added to the existing calls. +proc installPortalFinalizedHistoryApiHandlers*( + rpcServer: RpcServer, p: PortalProtocol +) = + rpcServer.rpc("portal_historyGetContent") do( + contentKeyBytes: string, headerBytes: string + ) -> ContentInfo: + let + contentKeyByteList = ContentKeyByteList.init(hexToSeqByte(contentKeyBytes)) + contentKey = decode(contentKeyByteList).valueOr: + raise invalidKeyErr() + contentId = toContentId(contentKey) + header = decodeRlp(hexToSeqByte(headerBytes), Header).valueOr: + raise invalidRequest((code: -39005, msg: "Failed to decode header: " & error)) + + p.getLocalContent(contentKeyByteList, contentId).isErrOr: + return ContentInfo(content: value.to0xHex(), utpTransfer: false) + + let contentLookupResult = (await p.contentLookup(contentKeyByteList, contentId)).valueOr: + raise contentNotFoundErr() + + validateContent(contentKey, contentLookupResult.content, header).isOkOr: + p.banNode( + contentLookupResult.receivedFrom.id, + NodeBanDurationContentLookupFailedValidation, + ) + raise invalidValueErr() + + p.storeContent( + contentKeyByteList, contentId, contentLookupResult.content, cacheContent = true + ) + + ContentInfo( + content: contentLookupResult.content.to0xHex(), + utpTransfer: contentLookupResult.utpTransfer, + ) + + rpcServer.rpc("portal_historyTraceGetContent") do( + contentKeyBytes: string, headerBytes: string + ) -> TraceContentLookupResult: + let + contentKeyByteList = ContentKeyByteList.init(hexToSeqByte(contentKeyBytes)) + contentKey = decode(contentKeyByteList).valueOr: + raise invalidKeyErr() + contentId = toContentId(contentKey) + header = decodeRlp(hexToSeqByte(headerBytes), Header).valueOr: + raise invalidRequest((code: -39005, msg: "Failed to decode header: " & error)) + + p.getLocalContent(contentKeyByteList, contentId).isErrOr: + return TraceContentLookupResult( + content: Opt.some(value), + utpTransfer: false, + trace: TraceObject( + origin: p.localNode.id, + targetId: contentId, + receivedFrom: Opt.some(p.localNode.id), + ), + ) + + # TODO: Might want to restructure the lookup result here. Potentially doing + # the json conversion in this module. + let + res = await p.traceContentLookup(contentKeyByteList, contentId) + valueBytes = res.content.valueOr: + let data = Opt.some(JrpcConv.encode(res.trace).JsonString) + raise contentNotFoundErrWithTrace(data) + + validateContent(contentKey, valueBytes, header).isOkOr: + if res.trace.receivedFrom.isSome(): + p.banNode( + res.trace.receivedFrom.get(), NodeBanDurationContentLookupFailedValidation + ) + raise invalidValueErr() + + p.storeContent(contentKeyByteList, contentId, valueBytes, cacheContent = true) + + res + + rpcServer.rpc("portal_historyPutContent") do( + contentKeyBytes: string, contentValueBytes: string + ) -> PutContentResult: + let + contentKeyByteList = ContentKeyByteList.init(hexToSeqByte(contentKeyBytes)) + _ = decode(contentKeyByteList).valueOr: + raise invalidKeyErr() + offerValueBytes = hexToSeqByte(contentValueBytes) + + # Note: Not validating content as this would have a high impact on bridge + # gossip performance. + # As no validation is done here, the content is not stored locally. + # TODO: Add default on validation by optional validation parameter. + gossipMetadata = await p.neighborhoodGossip( + Opt.none(NodeId), + ContentKeysList(@[contentKeyByteList]), + @[offerValueBytes], + enableNodeLookup = true, + ) + + PutContentResult( + storedLocally: false, + peerCount: gossipMetadata.successCount, + acceptMetadata: AcceptMetadata( + acceptedCount: gossipMetadata.acceptedCount, + genericDeclineCount: gossipMetadata.genericDeclineCount, + alreadyStoredCount: gossipMetadata.alreadyStoredCount, + notWithinRadiusCount: gossipMetadata.notWithinRadiusCount, + rateLimitedCount: gossipMetadata.rateLimitedCount, + transferInProgressCount: gossipMetadata.transferInProgressCount, + ), + ) diff --git a/portal/rpc/rpc_types.nim b/portal/rpc/rpc_types.nim index f3f8dd0863..b5c94373bc 100644 --- a/portal/rpc/rpc_types.nim +++ b/portal/rpc/rpc_types.nim @@ -64,7 +64,7 @@ template payloadTypeRequiredError*(): auto = template userSpecifiedPayloadBlockedByClientError*(): auto = UserSpecifiedPayloadBlockedByClientError.applicationError() -template invalidRequest(error: (int, string)): auto = +template invalidRequest*(error: (int, string)): auto = (ref errors.InvalidRequest)(code: error.code, msg: error.msg) template invalidKeyErr*(): auto = diff --git a/portal/tests/all_portal_tests.nim b/portal/tests/all_portal_tests.nim index e8358fbf0b..67d3c1b108 100644 --- a/portal/tests/all_portal_tests.nim +++ b/portal/tests/all_portal_tests.nim @@ -11,5 +11,6 @@ import ./test_content_db, ./wire_protocol_tests/all_wire_protocol_tests, ./legacy_history_network_tests/all_history_network_tests, + ./history_network_tests/all_history_network_tests, ./beacon_network_tests/all_beacon_network_tests, ./rpc_tests/all_rpc_tests diff --git a/portal/tests/history_network_tests/all_history_network_tests.nim b/portal/tests/history_network_tests/all_history_network_tests.nim new file mode 100644 index 0000000000..ee0ba0f2c2 --- /dev/null +++ b/portal/tests/history_network_tests/all_history_network_tests.nim @@ -0,0 +1,14 @@ +# Nimbus +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.warning[UnusedImport]: off.} + +import + ./test_history_content_keys, + ./test_history_content_values, + ./test_history_endpoints, + ./test_history_network diff --git a/portal/tests/history_network_tests/history_test_helpers.nim b/portal/tests/history_network_tests/history_test_helpers.nim new file mode 100644 index 0000000000..372bfe4b11 --- /dev/null +++ b/portal/tests/history_network_tests/history_test_helpers.nim @@ -0,0 +1,63 @@ +# Nimbus +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import + chronos, + eth/keys, + eth/p2p/discoveryv5/protocol, + ../../database/content_db, + ../../network/wire/[portal_protocol, portal_stream], + ../../network/history/history_network, + ../test_helpers + +type HistoryNode* = ref object + discovery: protocol.Protocol + historyNetwork*: HistoryNetwork + +proc newHistoryNetwork*( + rng: ref HmacDrbgContext, + port: int, + getHeaderCallback: GetHeaderCallback = defaultNoGetHeader, +): HistoryNode = + let + node = + try: + initDiscoveryNode(rng, PrivateKey.random(rng[]), localAddress(port)) + except CatchableError as e: + raiseAssert "Failed to initialize discovery node: " & e.msg + db = ContentDB.new( + "", + uint32.high, + RadiusConfig(kind: Static, logRadius: 256), + node.localNode.id, + inMemory = true, + ) + streamManager = StreamManager.new(node) + + HistoryNode( + discovery: node, + historyNetwork: HistoryNetwork.new( + PortalNetwork.mainnet, node, db, streamManager, getHeaderCallback + ), + ) + +proc start*(n: HistoryNode) = + n.discovery.start() + n.historyNetwork.start() + +proc stop*(n: HistoryNode) {.async: (raises: []).} = + await n.historyNetwork.stop() + await n.discovery.closeWait() + n.historyNetwork.contentDB.close() + +func portalProtocol*(n: HistoryNode): PortalProtocol = + n.historyNetwork.portalProtocol + +func localNode*(n: HistoryNode): Node = + n.historyNetwork.localNode() diff --git a/portal/tests/history_network_tests/test_history_content_keys.nim b/portal/tests/history_network_tests/test_history_content_keys.nim new file mode 100644 index 0000000000..2645f28c01 --- /dev/null +++ b/portal/tests/history_network_tests/test_history_content_keys.nim @@ -0,0 +1,115 @@ +# Nimbus +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import unittest2, stew/byteutils, ../../network/history/history_content + +suite "History Network Content Keys": + test "toContentId": + # Input + const blockNumbers = [ + 1.uint64, + 1000.uint64, + 12_345_678.uint64, + uint64.high(), + uint64.high() - 1, + uint64.high() div 2, + uint64.high() div 16 + 1, + 6148914691236517205'u64, + 12297829382473034410'u64, + 11574427654092267680'u64, + ] + + # Output + const contentIds = [ + "0001000000000000000000000000000000000000000000000000000000000001", + "03e8000000000000000000000000000000000000000000000000000000000001", + "614e3d0000000000000000000000000000000000000000000000000000000001", + "ffffffffffffffff000000000000000000000000000000000000000000000001", + "fffeffffffffffff000000000000000000000000000000000000000000000001", + "fffffffffffffffe000000000000000000000000000000000000000000000001", + "0000000000000008000000000000000000000000000000000000000000000001", + "5555aaaaaaaaaaaa000000000000000000000000000000000000000000000001", + "aaaa555555555555000000000000000000000000000000000000000000000001", + "a0a0050505050505000000000000000000000000000000000000000000000001", + ] + + for i in 0 ..< blockNumbers.len(): + let contentId = toContentId(blockNumbers[i], ContentType.receipts) + + check contentIds[i] == contentId.dumpHex() + + test "BlockBody": + # Input + const blockNumber = 12_345_678.uint64 + + # Output + const + contentKeyHex = "0x004e61bc0000000000" + contentId = + "44012581390156707874310974263613699127815223388818970640389075388176810377216" + # or + contentIdHexBE = + "614e3d0000000000000000000000000000000000000000000000000000000000" + + let contentKey = blockBodyContentKey(blockNumber) + + let encoded = encode(contentKey) + check encoded.asSeq.to0xHex == contentKeyHex + let decoded = decode(encoded) + check decoded.isSome() + + let contentKeyDecoded = decoded.get() + check: + contentKeyDecoded.contentType == contentKey.contentType + contentKeyDecoded.blockBodyKey == contentKey.blockBodyKey + + toContentId(contentKey) == parse(contentId, StUint[256], 10) + # In stint this does BE hex string + toContentId(contentKey).toHex() == contentIdHexBE + + test "Receipts": + # Input + const blockNumber = 12_345_678.uint64 + + # Output + const + contentKeyHex = "0x014e61bc0000000000" + contentId = + "44012581390156707874310974263613699127815223388818970640389075388176810377217" + # or + contentIdHexBE = + "614e3d0000000000000000000000000000000000000000000000000000000001" + + let contentKey = receiptsContentKey(blockNumber) + + let encoded = encode(contentKey) + check encoded.asSeq.to0xHex == contentKeyHex + let decoded = decode(encoded) + check decoded.isSome() + + let contentKeyDecoded = decoded.get() + check: + contentKeyDecoded.contentType == contentKey.contentType + contentKeyDecoded.receiptsKey == contentKey.receiptsKey + + toContentId(contentKey) == parse(contentId, StUint[256], 10) + # In stint this does BE hex string + toContentId(contentKey).toHex() == contentIdHexBE + + test "Invalid prefix - after valid range": + let encoded = ContentKeyByteList.init(@[byte 0x02]) + let decoded = decode(encoded) + + check decoded.isErr() + + test "Invalid key - empty input": + let encoded = ContentKeyByteList.init(@[]) + let decoded = decode(encoded) + + check decoded.isErr() diff --git a/portal/tests/history_network_tests/test_history_content_values.nim b/portal/tests/history_network_tests/test_history_content_values.nim new file mode 100644 index 0000000000..a04a06c7e3 --- /dev/null +++ b/portal/tests/history_network_tests/test_history_content_values.nim @@ -0,0 +1,60 @@ +# Nimbus +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import + unittest2, + results, + stew/byteutils, + ../../common/common_types, + ../../eth_data/yaml_utils, + ../../tools/eth_data_exporter/el_data_exporter, + ../../network/history/history_validation + +from std/os import walkDir, splitFile, PathComponent + +const testsPath = "./vendor/portal-spec-tests/tests/mainnet/history/block_data/" + +suite "History Network Content Values": + test "BlockBody and Receipts Encoding/Decoding and Verification": + for kind, path in walkDir(testsPath): + if kind == pcFile and path.splitFile.ext == ".yaml": + let + blockData = BlockData.loadFromYaml(path).valueOr: + raiseAssert "Cannot read test vector: " & error + + headerEncoded = blockData.header.hexToSeqByte() + bodyEncoded = blockData.body.hexToSeqByte() + + let header = decodeRlp(headerEncoded, Header).expect("Valid header") + + let contentKey = blockBodyContentKey(header.number) + check validateContent(contentKey, bodyEncoded, header).isOk() + + let contentValue = decodeRlp(bodyEncoded, BlockBody) + check contentValue.isOk() + check rlp.encode(contentValue.get()) == bodyEncoded + + test "Receipts Encoding/Decoding and Verification": + for kind, path in walkDir(testsPath): + if kind == pcFile and path.splitFile.ext == ".yaml": + let + blockData = BlockData.loadFromYaml(path).valueOr: + raiseAssert "Cannot read test vector: " & error + + headerEncoded = blockData.header.hexToSeqByte() + receiptsEncoded = blockData.receipts.hexToSeqByte() + + let header = decodeRlp(headerEncoded, Header).expect("Valid header") + + let contentKey = receiptsContentKey(header.number) + check validateContent(contentKey, receiptsEncoded, header).isOk() + + let contentValue = decodeRlp(receiptsEncoded, StoredReceipts) + check contentValue.isOk() + check rlp.encode(contentValue.get()) == receiptsEncoded diff --git a/portal/tests/history_network_tests/test_history_endpoints.nim b/portal/tests/history_network_tests/test_history_endpoints.nim new file mode 100644 index 0000000000..4ba714d3a4 --- /dev/null +++ b/portal/tests/history_network_tests/test_history_endpoints.nim @@ -0,0 +1,95 @@ +# Nimbus +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import + unittest2, + stew/byteutils, + chronos/unittest2/asynctests, + ../../eth_data/yaml_utils, + ../../tools/eth_data_exporter/el_data_exporter, + ../../network/wire/portal_protocol, + ../../network/history/history_endpoints, + ./history_test_helpers + +from std/os import walkDir, splitFile, PathComponent + +const testsPath = "./vendor/portal-spec-tests/tests/mainnet/history/block_data/" + +suite "History Network Endpoints": + asyncTest "GetBlockBody": + let + rng = newRng() + node1 = newHistoryNetwork(rng, 9001) + node2 = newHistoryNetwork(rng, 9002) + + node1.start() + node2.start() + + check: + node1.portalProtocol().addNode(node2.localNode()) == Added + node2.portalProtocol().addNode(node1.localNode()) == Added + + (await node1.portalProtocol().ping(node2.localNode())).isOk() + (await node2.portalProtocol().ping(node1.localNode())).isOk() + + for kind, path in walkDir(testsPath): + if kind == pcFile and path.splitFile.ext == ".yaml": + let + blockData = BlockData.loadFromYaml(path).valueOr: + raiseAssert "Cannot read test vector: " & error + + headerEncoded = blockData.header.hexToSeqByte() + bodyEncoded = blockData.body.hexToSeqByte() + header = decodeRlp(headerEncoded, Header).expect("Valid header") + contentKey = blockBodyContentKey(header.number) + + node1.portalProtocol().storeContent( + contentKey.encode(), contentKey.toContentId(), bodyEncoded + ) + + check (await node2.historyNetwork.getBlockBody(header)).isOk() + + await node1.stop() + await node2.stop() + + asyncTest "GetReceipts": + let + rng = newRng() + node1 = newHistoryNetwork(rng, 9001) + node2 = newHistoryNetwork(rng, 9002) + + node1.start() + node2.start() + + check: + node1.portalProtocol().addNode(node2.localNode()) == Added + node2.portalProtocol().addNode(node1.localNode()) == Added + + (await node1.portalProtocol().ping(node2.localNode())).isOk() + (await node2.portalProtocol().ping(node1.localNode())).isOk() + + for kind, path in walkDir(testsPath): + if kind == pcFile and path.splitFile.ext == ".yaml": + let + blockData = BlockData.loadFromYaml(path).valueOr: + raiseAssert "Cannot read test vector: " & error + + headerEncoded = blockData.header.hexToSeqByte() + receiptsEncoded = blockData.receipts.hexToSeqByte() + header = decodeRlp(headerEncoded, Header).expect("Valid header") + contentKey = receiptsContentKey(header.number) + + node1.portalProtocol().storeContent( + contentKey.encode(), contentKey.toContentId(), receiptsEncoded + ) + + check (await node2.historyNetwork.getReceipts(header)).isOk() + + await node1.stop() + await node2.stop() diff --git a/portal/tests/history_network_tests/test_history_network.nim b/portal/tests/history_network_tests/test_history_network.nim new file mode 100644 index 0000000000..3d7977c96f --- /dev/null +++ b/portal/tests/history_network_tests/test_history_network.nim @@ -0,0 +1,158 @@ +# Nimbus +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import + unittest2, + stew/byteutils, + chronos/unittest2/asynctests, + ../../eth_data/yaml_utils, + ../../tools/eth_data_exporter/el_data_exporter, + ../../network/wire/portal_protocol, + ../../network/history/history_network, + ./history_test_helpers + +from eth/common/accounts import EMPTY_ROOT_HASH + +const testsPath = "./vendor/portal-spec-tests/tests/mainnet/history/block_data/" + +suite "History Network": + asyncTest "Offer content": + let + path = testsPath & "block-data-22869878.yaml" + blockData = BlockData.loadFromYaml(path).valueOr: + raiseAssert "Cannot read test vector: " & error + + headerEncoded = blockData.header.hexToSeqByte() + bodyEncoded = blockData.body.hexToSeqByte() + header = decodeRlp(headerEncoded, Header).expect("Valid header") + contentKey = blockBodyContentKey(header.number) + contentKV = ContentKV(contentKey: contentKey.encode(), content: bodyEncoded) + + proc getHeader( + blockNumber: uint64 + ): Future[Result[Header, string]] {.async: (raises: [CancelledError]), gcsafe.} = + if header.number != blockNumber: + return err( + "Block number mismatch: expected " & $blockNumber & ", got " & $header.number + ) + ok(header) + + let + rng = newRng() + node1 = newHistoryNetwork(rng, 9001) + node2 = newHistoryNetwork(rng, 9002, getHeader) + + node1.start() + node2.start() + + check: + node1.portalProtocol().addNode(node2.localNode()) == Added + node2.portalProtocol().addNode(node1.localNode()) == Added + + (await node1.portalProtocol().ping(node2.localNode())).isOk() + (await node2.portalProtocol().ping(node1.localNode())).isOk() + + # This version of offer does not require to store the content locally + (await node1.portalProtocol().offer(node2.localNode(), @[contentKV])).isOk() + + node2 + .portalProtocol() + .getLocalContent(contentKey.encode(), contentKey.toContentId()) + .isSome() + + await node1.stop() + await node2.stop() + + asyncTest "Offer - Maximum plus one content Keys in 1 message": + let + rng = newRng() + node1 = newHistoryNetwork(rng, 9001) + node2 = newHistoryNetwork(rng, 9002) + + node1.start() + node2.start() + + check: + node1.portalProtocol().addNode(node2.localNode()) == Added + node2.portalProtocol().addNode(node1.localNode()) == Added + + (await node1.portalProtocol().ping(node2.localNode())).isOk() + (await node2.portalProtocol().ping(node1.localNode())).isOk() + + var list: seq[ContentKeyByteList] + for i in 0 ..< contentKeysLimit + 1: + list.add(blockBodyContentKey(i.uint64).encode()) + # This is invalid way of creating ContentKeysList and will allow to go over the limit + let contentKeyList = ContentKeysList(list) + + check (await node1.portalProtocol().offer(node2.localNode(), contentKeyList)).isErr() + + await node1.stop() + await node2.stop() + + asyncTest "Offer - Maximum block bodies in 1 message": + var count = 0 + proc getHeader( + blockNumber: uint64 + ): Future[Result[Header, string]] {.async: (raises: [CancelledError]), gcsafe.} = + count.inc() + # Return header with correct number and roots filled in for empty blocks + ok( + Header( + number: blockNumber, + transactionsRoot: EMPTY_ROOT_HASH, + ommersHash: EMPTY_UNCLE_HASH, + withdrawalsRoot: Opt.some(EMPTY_ROOT_HASH), + ) + ) + + let + rng = newRng() + node1 = newHistoryNetwork(rng, 9001) + node2 = newHistoryNetwork(rng, 9002, getHeader) + + node1.start() + node2.start() + + check: + node1.portalProtocol().addNode(node2.localNode()) == Added + node2.portalProtocol().addNode(node1.localNode()) == Added + + (await node1.portalProtocol().ping(node2.localNode())).isOk() + (await node2.portalProtocol().ping(node1.localNode())).isOk() + + # All blocks we send in the test are empty + let emptyWithdrawals: seq[Withdrawal] = @[] + let emptyBody = rlp.encode( + BlockBody(transactions: @[], uncles: @[], withdrawals: Opt.some(emptyWithdrawals)) + ) + + var contentKVList: seq[ContentKV] + for i in 0 ..< contentKeysLimit: + contentKVList.add( + ContentKV( + contentKey: blockBodyContentKey(i.uint64).encode(), content: emptyBody + ) + ) + + check (await node1.portalProtocol().offer(node2.localNode(), contentKVList)).isOk() + + # Wait for contentQueueWorker to process all the content + while count < contentKeysLimit: + await sleepAsync(10.milliseconds) + + for contentKV in contentKVList: + let contentId = contentKV.contentKey.toContentId().expect("Valid content key") + check node2 + .portalProtocol() + .getLocalContent(contentKV.contentKey, contentId) + .isSome() + + await node1.stop() + await node2.stop() diff --git a/vendor/portal-spec-tests b/vendor/portal-spec-tests index 3b3c0cd12d..45e46aa34a 160000 --- a/vendor/portal-spec-tests +++ b/vendor/portal-spec-tests @@ -1 +1 @@ -Subproject commit 3b3c0cd12ddad5f4293df3d46fd0b727bd61d305 +Subproject commit 45e46aa34a79f636fed3b09e7ab03c0be6a04ea3