diff --git a/packages/horizon/contracts/interfaces/IRecurringCollector.sol b/packages/horizon/contracts/interfaces/IRecurringCollector.sol index 704515aa7..5bf597090 100644 --- a/packages/horizon/contracts/interfaces/IRecurringCollector.sol +++ b/packages/horizon/contracts/interfaces/IRecurringCollector.sol @@ -147,6 +147,7 @@ interface IRecurringCollector is IAuthorizable, IPaymentsCollector { * @param tokens The amount of tokens to collect * @param dataServiceCut The data service cut in parts per million * @param receiverDestination The address where the collected fees should be sent + * @param maxSlippage Max acceptable tokens to lose due to rate limiting, or type(uint256).max to ignore */ struct CollectParams { bytes16 agreementId; @@ -154,6 +155,7 @@ interface IRecurringCollector is IAuthorizable, IPaymentsCollector { uint256 tokens; uint256 dataServiceCut; address receiverDestination; + uint256 maxSlippage; } /** @@ -369,6 +371,14 @@ interface IRecurringCollector is IAuthorizable, IPaymentsCollector { */ error RecurringCollectorInvalidUpdateNonce(bytes16 agreementId, uint32 expected, uint32 provided); + /** + * @notice Thrown when collected tokens are less than requested beyond the allowed slippage + * @param requested The amount of tokens requested to collect + * @param actual The actual amount that would be collected + * @param maxSlippage The maximum allowed slippage + */ + error RecurringCollectorExcessiveSlippage(uint256 requested, uint256 actual, uint256 maxSlippage); + /** * @dev Accept an indexing agreement. * @param signedRCA The signed Recurring Collection Agreement which is to be accepted. diff --git a/packages/horizon/contracts/payments/collectors/RecurringCollector.sol b/packages/horizon/contracts/payments/collectors/RecurringCollector.sol index 56d2b9d5b..79f1d1a12 100644 --- a/packages/horizon/contracts/payments/collectors/RecurringCollector.sol +++ b/packages/horizon/contracts/payments/collectors/RecurringCollector.sol @@ -336,6 +336,12 @@ contract RecurringCollector is EIP712, GraphDirectory, Authorizable, IRecurringC if (_params.tokens != 0) { tokensToCollect = _requireValidCollect(agreement, _params.agreementId, _params.tokens, collectionSeconds); + uint256 slippage = _params.tokens - tokensToCollect; + require( + slippage <= _params.maxSlippage, + RecurringCollectorExcessiveSlippage(_params.tokens, tokensToCollect, _params.maxSlippage) + ); + _graphPaymentsEscrow().collect( _paymentType, agreement.payer, diff --git a/packages/horizon/test/unit/payments/recurring-collector/collect.t.sol b/packages/horizon/test/unit/payments/recurring-collector/collect.t.sol index a972734a6..d44284e9f 100644 --- a/packages/horizon/test/unit/payments/recurring-collector/collect.t.sol +++ b/packages/horizon/test/unit/payments/recurring-collector/collect.t.sol @@ -2,6 +2,7 @@ pragma solidity 0.8.27; import { IRecurringCollector } from "../../../../contracts/interfaces/IRecurringCollector.sol"; +import { IGraphPayments } from "../../../../contracts/interfaces/IGraphPayments.sol"; import { IHorizonStakingTypes } from "../../../../contracts/interfaces/internal/IHorizonStakingTypes.sol"; import { RecurringCollectorSharedTest } from "./shared.t.sol"; @@ -309,5 +310,138 @@ contract RecurringCollectorCollectTest is RecurringCollectorSharedTest { uint256 collected = _recurringCollector.collect(_paymentType(fuzzy.unboundedPaymentType), data); assertEq(collected, tokens); } + + function test_Collect_RevertWhen_ExceedsMaxSlippage() public { + // Setup: Create agreement with known parameters + IRecurringCollector.RecurringCollectionAgreement memory rca; + rca.deadline = uint64(block.timestamp + 1000); + rca.endsAt = uint64(block.timestamp + 2000); + rca.payer = address(0x123); + rca.dataService = address(0x456); + rca.serviceProvider = address(0x789); + rca.maxInitialTokens = 0; // No initial tokens to keep calculation simple + rca.maxOngoingTokensPerSecond = 1 ether; // 1 token per second + rca.minSecondsPerCollection = 60; // 1 minute + rca.maxSecondsPerCollection = 3600; // 1 hour + rca.nonce = 1; + rca.metadata = ""; + + // Accept the agreement + _recurringCollectorHelper.authorizeSignerWithChecks(rca.payer, 1); + IRecurringCollector.SignedRCA memory signedRCA = _recurringCollectorHelper.generateSignedRCA(rca, 1); + bytes16 agreementId = _accept(signedRCA); + + // Do a first collection to use up initial tokens allowance + skip(rca.minSecondsPerCollection); + IRecurringCollector.CollectParams memory firstCollection = IRecurringCollector.CollectParams({ + agreementId: agreementId, + collectionId: keccak256("first"), + tokens: 1 ether, // Small amount + dataServiceCut: 0, + receiverDestination: rca.serviceProvider, + maxSlippage: type(uint256).max + }); + vm.prank(rca.dataService); + _recurringCollector.collect(IGraphPayments.PaymentTypes.IndexingFee, _generateCollectData(firstCollection)); + + // Wait minimum collection time again for second collection + skip(rca.minSecondsPerCollection); + + // Calculate expected narrowing: max allowed is 60 tokens (60 seconds * 1 token/second) + uint256 maxAllowed = rca.maxOngoingTokensPerSecond * rca.minSecondsPerCollection; // 60 tokens + uint256 requested = maxAllowed + 50 ether; // Request 110 tokens + uint256 expectedSlippage = requested - maxAllowed; // 50 tokens + uint256 maxSlippage = expectedSlippage - 1; // Allow up to 49 tokens slippage + + // Create collect params with slippage protection + IRecurringCollector.CollectParams memory collectParams = IRecurringCollector.CollectParams({ + agreementId: agreementId, + collectionId: keccak256("test"), + tokens: requested, + dataServiceCut: 0, + receiverDestination: rca.serviceProvider, + maxSlippage: maxSlippage + }); + + bytes memory data = _generateCollectData(collectParams); + + // Expect revert due to excessive slippage (50 > 49) + vm.expectRevert( + abi.encodeWithSelector( + IRecurringCollector.RecurringCollectorExcessiveSlippage.selector, + requested, + maxAllowed, + maxSlippage + ) + ); + vm.prank(rca.dataService); + _recurringCollector.collect(IGraphPayments.PaymentTypes.IndexingFee, data); + } + + function test_Collect_OK_WithMaxSlippageDisabled() public { + // Setup: Create agreement with known parameters + IRecurringCollector.RecurringCollectionAgreement memory rca; + rca.deadline = uint64(block.timestamp + 1000); + rca.endsAt = uint64(block.timestamp + 2000); + rca.payer = address(0x123); + rca.dataService = address(0x456); + rca.serviceProvider = address(0x789); + rca.maxInitialTokens = 0; // No initial tokens to keep calculation simple + rca.maxOngoingTokensPerSecond = 1 ether; // 1 token per second + rca.minSecondsPerCollection = 60; // 1 minute + rca.maxSecondsPerCollection = 3600; // 1 hour + rca.nonce = 1; + rca.metadata = ""; + + // Accept the agreement + _recurringCollectorHelper.authorizeSignerWithChecks(rca.payer, 1); + IRecurringCollector.SignedRCA memory signedRCA = _recurringCollectorHelper.generateSignedRCA(rca, 1); + bytes16 agreementId = _accept(signedRCA); + + // Do a first collection to use up initial tokens allowance + skip(rca.minSecondsPerCollection); + IRecurringCollector.CollectParams memory firstCollection = IRecurringCollector.CollectParams({ + agreementId: agreementId, + collectionId: keccak256("first"), + tokens: 1 ether, // Small amount + dataServiceCut: 0, + receiverDestination: rca.serviceProvider, + maxSlippage: type(uint256).max + }); + vm.prank(rca.dataService); + _recurringCollector.collect(IGraphPayments.PaymentTypes.IndexingFee, _generateCollectData(firstCollection)); + + // Wait minimum collection time again for second collection + skip(rca.minSecondsPerCollection); + + // Calculate expected narrowing: max allowed is 60 tokens (60 seconds * 1 token/second) + uint256 maxAllowed = rca.maxOngoingTokensPerSecond * rca.minSecondsPerCollection; // 60 tokens + uint256 requested = maxAllowed + 50 ether; // Request 110 tokens (will be narrowed to 60) + + // Create collect params with slippage disabled (type(uint256).max) + IRecurringCollector.CollectParams memory collectParams = IRecurringCollector.CollectParams({ + agreementId: agreementId, + collectionId: keccak256("test"), + tokens: requested, + dataServiceCut: 0, + receiverDestination: rca.serviceProvider, + maxSlippage: type(uint256).max + }); + + bytes memory data = _generateCollectData(collectParams); + + // Should succeed despite slippage when maxSlippage is disabled + _expectCollectCallAndEmit( + rca, + agreementId, + IGraphPayments.PaymentTypes.IndexingFee, + collectParams, + maxAllowed // Will collect the narrowed amount + ); + + vm.prank(rca.dataService); + uint256 collected = _recurringCollector.collect(IGraphPayments.PaymentTypes.IndexingFee, data); + assertEq(collected, maxAllowed); + } /* solhint-enable graph/func-name-mixedcase */ } diff --git a/packages/horizon/test/unit/payments/recurring-collector/shared.t.sol b/packages/horizon/test/unit/payments/recurring-collector/shared.t.sol index 9a564086e..2e76c048e 100644 --- a/packages/horizon/test/unit/payments/recurring-collector/shared.t.sol +++ b/packages/horizon/test/unit/payments/recurring-collector/shared.t.sol @@ -233,7 +233,8 @@ contract RecurringCollectorSharedTest is Test, Bounder { collectionId: _collectionId, tokens: _tokens, dataServiceCut: _dataServiceCut, - receiverDestination: _rca.serviceProvider + receiverDestination: _rca.serviceProvider, + maxSlippage: type(uint256).max }); } diff --git a/packages/subgraph-service/contracts/libraries/IndexingAgreement.sol b/packages/subgraph-service/contracts/libraries/IndexingAgreement.sol index d1f42f2b5..1935a6197 100644 --- a/packages/subgraph-service/contracts/libraries/IndexingAgreement.sol +++ b/packages/subgraph-service/contracts/libraries/IndexingAgreement.sol @@ -96,12 +96,14 @@ library IndexingAgreement { * @param poi The proof of indexing (POI) * @param poiBlockNumber The block number of the POI * @param metadata Additional metadata associated with the collection + * @param maxSlippage Max acceptable tokens to lose due to rate limiting, or type(uint256).max to ignore */ struct CollectIndexingFeeDataV1 { uint256 entities; bytes32 poi; uint256 poiBlockNumber; bytes metadata; + uint256 maxSlippage; } /** @@ -565,7 +567,8 @@ library IndexingAgreement { collectionId: bytes32(uint256(uint160(wrapper.agreement.allocationId))), tokens: expectedTokens, dataServiceCut: 0, - receiverDestination: params.receiverDestination + receiverDestination: params.receiverDestination, + maxSlippage: data.maxSlippage }) ) ); diff --git a/packages/subgraph-service/test/unit/subgraphService/indexing-agreement/collect.t.sol b/packages/subgraph-service/test/unit/subgraphService/indexing-agreement/collect.t.sol index 711154be5..f41bdf976 100644 --- a/packages/subgraph-service/test/unit/subgraphService/indexing-agreement/collect.t.sol +++ b/packages/subgraph-service/test/unit/subgraphService/indexing-agreement/collect.t.sol @@ -45,7 +45,8 @@ contract SubgraphServiceIndexingAgreementCollectTest is SubgraphServiceIndexingA collectionId: bytes32(uint256(uint160(indexerState.allocationId))), tokens: 0, dataServiceCut: 0, - receiverDestination: indexerState.addr + receiverDestination: indexerState.addr, + maxSlippage: type(uint256).max }) ); uint256 tokensCollected = bound(unboundedTokensCollected, 1, indexerState.tokens / stakeToFeesRatio); diff --git a/packages/subgraph-service/test/unit/subgraphService/indexing-agreement/shared.t.sol b/packages/subgraph-service/test/unit/subgraphService/indexing-agreement/shared.t.sol index f04be267b..b51008c20 100644 --- a/packages/subgraph-service/test/unit/subgraphService/indexing-agreement/shared.t.sol +++ b/packages/subgraph-service/test/unit/subgraphService/indexing-agreement/shared.t.sol @@ -392,7 +392,8 @@ contract SubgraphServiceIndexingAgreementSharedTest is SubgraphServiceTest, Boun entities: _entities, poi: _poi, poiBlockNumber: _poiBlock, - metadata: _metadata + metadata: _metadata, + maxSlippage: type(uint256).max }) ); }