Skip to content

Add getAssetChanges — a new JSON-RPC endpoint for incremental asset indexing#269

Open
joenoon wants to merge 3 commits intometaplex-foundation:mainfrom
joenoon:get-asset-changes-proposal
Open

Add getAssetChanges — a new JSON-RPC endpoint for incremental asset indexing#269
joenoon wants to merge 3 commits intometaplex-foundation:mainfrom
joenoon:get-asset-changes-proposal

Conversation

@joenoon
Copy link

@joenoon joenoon commented Feb 27, 2026

Maintaining a continuously up-to-date index of all NFTs across owners, collections, and assets requires incremental sync. The DAS API stores slot_updated on every asset but has no way to expose it as a change feed.

We investigated building this on searchAssets but it's not viable:

  • Cursor pagination requires sortBy: Id, so you can't cursor-paginate while sorting by slot
  • slot_updated is never returned in the response — no per-item checkpoint marker
  • last_indexed_slot in searchAssets is a global watermark, not per-asset

A dedicated endpoint avoids these constraints without risk to existing searchAssets clients.

What it does

Returns a forward-paginated feed of assets ordered by slot_updated, with per-item fields sufficient to reduce the need for many getAsset or getAssetBatch followup requests:

Request:

{
  "method": "getAssetChanges",
  "params": {
    "afterSlot": 250000000,
    "limit": 1000,
    "after": "<cursor>"
  }
}

Response:

{
  "currentSlot": 250731042,
  "items": [
    {
      "id": "Abc...xyz",
      "slotUpdated": 250000183,
      "owner": "Wallet...1",
      "delegate": null,
      "burnt": false,
      "collection": "Col...mint",
      "metadataUrl": "https://arweave.net/abc"
    }
  ],
  "after": "<next_cursor>"
}
  • currentSlot — max indexed slot, so clients know when they've reached head
  • slotUpdated — per-item checkpoint marker
  • owner / delegate / burnt — ownership and listing state
  • collection — from asset_grouping via LEFT JOIN LATERAL
  • metadataUrl — from asset_data via LEFT JOIN

Cursor pagination uses composite keyset (slot_updated, id) encoded as bs58. afterSlot is exclusive (strictly >) to enable clean checkpoint semantics. A migration adds a composite index on (slot_updated, id) to support this.

Implementation notes

  • Raw SQL with LEFT JOIN LATERAL follows established patterns in dao/scopes/asset.rs and dapi/change_logs.rs
  • verified = true OR verified IS NULL matches the backward-compatibility pattern used in every collection query (older indexer versions didn't write the verified flag)
  • encode_change_cursor / decode_change_cursor are public functions in digital_asset_types for testability

Tests

  • 10 unit tests: response serialization (camelCase keys, skip_serializing_if), cursor encode/decode round-trips, edge cases (zero slot, max slot, minimum valid size), invalid input rejection (too short, bad base58)
  • 3 integration tests: basic query with seeded NFT, cursor pagination across pages, empty database behavior

UPDATES — commit 2: assetTypes filter, type response field, creator field

The initial endpoint returns all asset types indiscriminately. Since the asset table indexes everything — NFTs, fungible tokens, unknown types — NFT-focused consumers get flooded with irrelevant changes and vice versa.

This commit adds:

  • assetTypes (required request parameter) — filters by simplified category: NFT, FungibleAsset, FungibleToken, Custom. Maps to specification_asset_class values in the DB via a parameterized IN (...) clause with ::text cast.
  • type (response field) — the precise Interface value (V1_NFT, ProgrammableNFT, MplCoreAsset, etc.), derived from specification_version + specification_asset_class using the existing Interface::from impl.
  • creator (response field) — first verified creator via a new LEFT JOIN LATERAL on asset_creators, reducing getAsset followup calls further.

Request:

{
  "method": "getAssetChanges",
  "params": {
    "assetTypes": ["NFT"],
    "afterSlot": 250000000,
    "limit": 1000
  }
}

Response items now include:

{
  "id": "Abc...xyz",
  "type": "ProgrammableNFT",
  "slotUpdated": 250000183,
  "owner": "Wallet...1",
  "creator": "Creator...1",
  ...
}

assetTypes is required and validated non-empty. Categories are deduplicated before expansion to DB values.

Tests

  • 4 new unit tests: AssetCategory serialization/deserialization round-trips, to_asset_classes mapping, type field always-present assertion
  • Existing unit and integration tests updated for new required field and response shape

Implement a new JSON-RPC endpoint for slot-anchored incremental asset
indexing. The endpoint returns a forward-paginated feed of asset changes
ordered by slot_updated, enabling clients to maintain a local index
without full sweeps.

The response includes per-item slotUpdated, owner, delegate, burnt,
collection, and metadataUrl — sufficient to track ownership, listing
state, and collection membership without secondary getAsset calls.

Cursor pagination uses composite keyset (slot_updated, id) for constant
time hops at any depth. The query uses LEFT JOIN LATERAL for efficient
single-collection lookups per asset.

Includes:
- Core implementation: request/response types, SQL query with keyset
  pagination, cursor encode/decode as public functions
- 10 unit tests: response serialization (camelCase, skip_serializing_if),
  cursor encode/decode round-trips, edge cases, invalid input rejection
- 3 integration tests: basic query, cursor pagination across pages,
  empty database behavior
@coderabbitai
Copy link

coderabbitai bot commented Feb 27, 2026

Summary by CodeRabbit

  • New Features

    • Added getAssetChanges RPC to fetch a paginated feed of asset changes (cursor and after_slot pagination), filterable by asset category, capped limit, and returning current slot, after cursor, and detailed items (id, type, slot, owner/delegate/creator, collection, metadata_url, burnt).
  • Chores

    • Added a database index to optimize asset change queries.
  • Tests

    • Added unit and integration tests covering serialization, cursor encode/decode, pagination, and empty-state behavior.

Walkthrough

Adds a new get_asset_changes RPC with cursor-based pagination, DTOs (AssetChangeItem/List, AssetCategory), DAO support and dapi helpers (encode/decode cursor), a DB migration for a composite index, and unit + integration tests for pagination and serialization.

Changes

Cohort / File(s) Summary
API Surface & Wiring
das_api/src/api/mod.rs, das_api/src/api/api_impl.rs, das_api/src/builder.rs
Adds GetAssetChanges payload and get_asset_changes RPC (snake_case + camelCase alias). Implements parse_change_cursor, request validation (asset_types, limit, after/after_slot), cursor decoding, effective after_slot computation, and delegates to dapi layer.
Public Response Models
digital_asset_types/src/rpc/response.rs
Introduces AssetCategory enum, AssetChangeItem, and AssetChangeList with camelCase serde naming and optional field omission.
DAPI: Cursor + Transformation
digital_asset_types/src/dapi/get_asset_changes.rs, digital_asset_types/src/dapi/mod.rs
Implements encode_change_cursor/decode_change_cursor and get_asset_changes that calls DAO, maps rows to AssetChangeItem (base58 encoding), computes next after cursor, and returns AssetChangeList.
DAO / Database Query
digital_asset_types/src/dao/scopes/asset.rs
Adds AssetChangeRow and get_asset_changes DAO with dynamic SQL supporting cursor (slot+id) or after_slot filtering, asset_class IN clause, lateral joins for collection/creator, and returns rows plus current max slot.
Database Migration
migration/src/m20260226_120101_add_idx_asset_slot_updated_id.rs, migration/src/lib.rs
Adds migration creating concurrent composite index on asset(slot_updated ASC, id ASC) and registers it; down migration drops the index.
Tests
digital_asset_types/tests/get_asset_changes.rs, integration_tests/tests/integration_tests/asset_changes_tests.rs, integration_tests/tests/integration_tests/main.rs
Adds unit tests for cursor encode/decode and serialization, and integration tests for basic pagination, cursor-based pagination, and empty-state behavior.

Sequence Diagram

sequenceDiagram
    autonumber
    participant Client as Client
    participant RpcApi as RPC API
    participant DasApi as DasApi
    participant Dapi as Dapi Module
    participant DB as Database

    Client->>RpcApi: getAssetChanges(payload)
    RpcApi->>DasApi: get_asset_changes(GetAssetChanges)
    DasApi->>DasApi: validate payload (asset_types, limit, after_slot)
    alt after cursor provided
        DasApi->>Dapi: decode_change_cursor(after)
        Dapi-->>DasApi: cursor_slot, cursor_id
    else no cursor
        DasApi->>DasApi: use after_slot only
    end
    DasApi->>DB: asset::get_asset_changes(after_slot, cursor_slot, cursor_id, limit, asset_classes)
    DB-->>DasApi: (rows, current_slot)
    DasApi->>Dapi: transform rows -> AssetChangeItem (base58 encode)
    Dapi->>Dapi: encode_change_cursor(last_row)
    Dapi-->>DasApi: AssetChangeList(current_slot, items, after)
    DasApi-->>RpcApi: AssetChangeList
    RpcApi-->>Client: JSON response
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Suggested reviewers

  • NicolasPennie
  • pmantica11
  • andrii-kl
🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 73.53% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically describes the main change: adding a new getAssetChanges JSON-RPC endpoint for incremental asset indexing.
Description check ✅ Passed The description is comprehensive, covering motivation, implementation details, API request/response examples, testing strategy, and a second commit update with assetTypes filtering and additional fields.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@digital_asset_types/src/dao/scopes/asset.rs`:
- Around line 882-896: The MAX(slot_updated) query in current_slot_stmt (built
with Statement::from_string) can cause a full table scan on the asset table; to
mitigate, consider replacing the direct aggregation with a more efficient
approach: maintain and query a lightweight metadata table or a dedicated index
that stores the latest slot, or use a descending index scan by querying "SELECT
slot_updated FROM asset ORDER BY slot_updated DESC LIMIT 1" via the same
Statement API (referencing current_slot_stmt, SlotRow, and current_slot) so
Postgres can use an index; alternatively, add/update a small cached table
updated in asset write paths to return the current slot without scanning the
full asset table.

ℹ️ Review info

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 37104b4 and 72526cf.

📒 Files selected for processing (27)
  • das_api/src/api/api_impl.rs
  • das_api/src/api/mod.rs
  • das_api/src/builder.rs
  • digital_asset_types/src/dao/scopes/asset.rs
  • digital_asset_types/src/dapi/get_asset_changes.rs
  • digital_asset_types/src/dapi/mod.rs
  • digital_asset_types/src/rpc/response.rs
  • digital_asset_types/tests/get_asset_changes.rs
  • integration_tests/tests/data/accounts/get_asset_changes_basic/52J4tmbP9o3GHQrkts8vhXm84CLzM7KSXboKGnrw3YTW
  • integration_tests/tests/data/accounts/get_asset_changes_basic/CMVuYDS9nTeujfTPJb8ik7CRhAqZv4DfjfdamFLkJgxE
  • integration_tests/tests/data/accounts/get_asset_changes_basic/Fc1MSqW3eifoMuDupvimtu2DbSYmGUtSQ7wBUoz87uK5
  • integration_tests/tests/data/accounts/get_asset_changes_cursor_pagination/13YBWA4DyLJc8QZPAAajNLefise35GZ2pVG2R97n5NYa
  • integration_tests/tests/data/accounts/get_asset_changes_cursor_pagination/2NqdYX6kJmMUoChnDXU2UrP9BsoPZivRw3uJG8iDhRRd
  • integration_tests/tests/data/accounts/get_asset_changes_cursor_pagination/2iMQVnGpJfSRfyPSyDgnnGMwiCa9ap5j29VZvHuoABpp
  • integration_tests/tests/data/accounts/get_asset_changes_cursor_pagination/52J4tmbP9o3GHQrkts8vhXm84CLzM7KSXboKGnrw3YTW
  • integration_tests/tests/data/accounts/get_asset_changes_cursor_pagination/5rEeYv8R25b8j6YTHJvYuCKEzq44UCw1Wx1Wx2VPPLz1
  • integration_tests/tests/data/accounts/get_asset_changes_cursor_pagination/AhbbU3exDvvM9cBzHxEjpYt4sMfFsXuQ3FUXv9VPE4F7
  • integration_tests/tests/data/accounts/get_asset_changes_cursor_pagination/CMVuYDS9nTeujfTPJb8ik7CRhAqZv4DfjfdamFLkJgxE
  • integration_tests/tests/data/accounts/get_asset_changes_cursor_pagination/CxYMsK8m52REmAv8mre7TNJx2QLfHnVEdhnt4vht37q4
  • integration_tests/tests/data/accounts/get_asset_changes_cursor_pagination/EjkMvnYTrmpAWLVz55HHL3UxcZcPYwiSooLNPheLP1kk
  • integration_tests/tests/data/accounts/get_asset_changes_cursor_pagination/Fc1MSqW3eifoMuDupvimtu2DbSYmGUtSQ7wBUoz87uK5
  • integration_tests/tests/data/accounts/get_asset_changes_cursor_pagination/HTKAVZZrDdyecCxzm3WEkCsG1GUmiqKm73PvngfuYRNK
  • integration_tests/tests/data/accounts/get_asset_changes_cursor_pagination/cpdh48eio3EZFc3rZw7HR6P3thRr22qbcNXtvC9dC18
  • integration_tests/tests/integration_tests/asset_changes_tests.rs
  • integration_tests/tests/integration_tests/main.rs
  • migration/src/lib.rs
  • migration/src/m20260226_120101_add_idx_asset_slot_updated_id.rs

Comment on lines +882 to +896
let current_slot_stmt = Statement::from_string(
DatabaseBackend::Postgres,
"SELECT COALESCE(MAX(slot_updated), 0) AS slot FROM asset".to_string(),
);

#[derive(Debug, FromQueryResult)]
struct SlotRow {
slot: i64,
}

let current_slot = SlotRow::find_by_statement(current_slot_stmt)
.one(conn)
.await?
.map(|r| r.slot)
.unwrap_or(0);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider performance implications of the MAX query.

The MAX(slot_updated) query performs a full table scan unless Postgres uses the new index in reverse. While Postgres can often optimize this, for very large asset tables this could add latency.

Consider whether this value could be cached or computed more efficiently if performance becomes a concern at scale. The current implementation is functionally correct.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@digital_asset_types/src/dao/scopes/asset.rs` around lines 882 - 896, The
MAX(slot_updated) query in current_slot_stmt (built with Statement::from_string)
can cause a full table scan on the asset table; to mitigate, consider replacing
the direct aggregation with a more efficient approach: maintain and query a
lightweight metadata table or a dedicated index that stores the latest slot, or
use a descending index scan by querying "SELECT slot_updated FROM asset ORDER BY
slot_updated DESC LIMIT 1" via the same Statement API (referencing
current_slot_stmt, SlotRow, and current_slot) so Postgres can use an index;
alternatively, add/update a small cached table updated in asset write paths to
return the current slot without scanning the full asset table.

Required assetTypes parameter (NFT, FungibleAsset, FungibleToken, Custom)
filters by specification_asset_class. Response items now include a type
field using the Interface enum and a creator field with the first verified
creator.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@digital_asset_types/src/dao/scopes/asset.rs`:
- Around line 887-893: The lateral join selecting a verified creator (alias
lat_cr from asset_creators ac) currently orders by ac.seq which is unreliable;
update the ORDER BY clause in that lateral subquery to use ac.position ASC
instead of ac.seq ASC so the creator selection matches the ordering used
elsewhere (see get_related_for_assets) and yields stable, metadata-based
ordering for both compressed and non-compressed assets.

In `@digital_asset_types/tests/get_asset_changes.rs`:
- Around line 133-167: Add a unit test that verifies negative i64 slot values
round-trip through the cursor encoder/decoder: create a new test (e.g.,
test_cursor_round_trip_negative_slot) that sets slot to a negative value (e.g.,
-1), an example asset_id, calls encode_change_cursor(slot, &asset_id), then
calls decode_change_cursor(&encoded).unwrap() and asserts decoded_slot == slot
and decoded_id == asset_id; this uses the existing encode_change_cursor and
decode_change_cursor helpers to ensure malformed/negative slots are preserved by
the round-trip.

In `@integration_tests/tests/integration_tests/asset_changes_tests.rs`:
- Around line 45-87: The test test_get_asset_changes_cursor_pagination currently
pages only twice; add a third fetch using api::GetAssetChanges
(AssetCategory::NFT, limit: Some(1), after: page2.after) and assert it returns
the final item (items.len() == 1) and that page3.after is None to confirm
exhaustion. Also strengthen ordering checks by comparing the keyset tuple
(slot_updated, id) from page1.items[0], page2.items[0], and page3.items[0] to
ensure non-decreasing order across pages (i.e., (slot_updated, id) <= next
page's tuple) rather than only comparing slot_updated.

ℹ️ Review info

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 72526cf and 2c35ae3.

📒 Files selected for processing (7)
  • das_api/src/api/api_impl.rs
  • das_api/src/api/mod.rs
  • digital_asset_types/src/dao/scopes/asset.rs
  • digital_asset_types/src/dapi/get_asset_changes.rs
  • digital_asset_types/src/rpc/response.rs
  • digital_asset_types/tests/get_asset_changes.rs
  • integration_tests/tests/integration_tests/asset_changes_tests.rs

Comment on lines +133 to +167
#[test]
fn test_cursor_round_trip() {
let slot: i64 = 123_456_789;
let asset_id: Vec<u8> = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];

let encoded = encode_change_cursor(slot, &asset_id);
let (decoded_slot, decoded_id) = decode_change_cursor(&encoded).unwrap();

assert_eq!(decoded_slot, slot);
assert_eq!(decoded_id, asset_id);
}

#[test]
fn test_cursor_round_trip_zero_slot() {
let slot: i64 = 0;
let asset_id: Vec<u8> = vec![255];

let encoded = encode_change_cursor(slot, &asset_id);
let (decoded_slot, decoded_id) = decode_change_cursor(&encoded).unwrap();

assert_eq!(decoded_slot, slot);
assert_eq!(decoded_id, asset_id);
}

#[test]
fn test_cursor_round_trip_max_slot() {
let slot: i64 = i64::MAX;
let asset_id: Vec<u8> = vec![0; 32]; // typical Solana pubkey size

let encoded = encode_change_cursor(slot, &asset_id);
let (decoded_slot, decoded_id) = decode_change_cursor(&encoded).unwrap();

assert_eq!(decoded_slot, slot);
assert_eq!(decoded_id, asset_id);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider adding a test for negative slot values in cursor round-trip.

The cursor uses i64 for the slot which allows negative values. While the API validates after_slot to be non-negative, a malformed cursor could theoretically encode a negative slot. Consider adding a test to verify behavior:

🧪 Optional: Test negative slot encoding
#[test]
fn test_cursor_round_trip_negative_slot() {
    // While the API rejects negative after_slot, cursors could theoretically encode them
    let slot: i64 = -1;
    let asset_id: Vec<u8> = vec![1, 2, 3];

    let encoded = encode_change_cursor(slot, &asset_id);
    let (decoded_slot, decoded_id) = decode_change_cursor(&encoded).unwrap();

    assert_eq!(decoded_slot, slot);
    assert_eq!(decoded_id, asset_id);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@digital_asset_types/tests/get_asset_changes.rs` around lines 133 - 167, Add a
unit test that verifies negative i64 slot values round-trip through the cursor
encoder/decoder: create a new test (e.g., test_cursor_round_trip_negative_slot)
that sets slot to a negative value (e.g., -1), an example asset_id, calls
encode_change_cursor(slot, &asset_id), then calls
decode_change_cursor(&encoded).unwrap() and asserts decoded_slot == slot and
decoded_id == asset_id; this uses the existing encode_change_cursor and
decode_change_cursor helpers to ensure malformed/negative slots are preserved by
the round-trip.

Comment on lines +45 to +87
#[tokio::test]
#[serial]
#[named]
async fn test_get_asset_changes_cursor_pagination() {
let name = trim_test_name(function_name!());
let setup = TestSetup::new(name.clone()).await;

let seeds: Vec<SeedEvent> = seed_nfts([
"CMVuYDS9nTeujfTPJb8ik7CRhAqZv4DfjfdamFLkJgxE",
"HTKAVZZrDdyecCxzm3WEkCsG1GUmiqKm73PvngfuYRNK",
"2NqdYX6kJmMUoChnDXU2UrP9BsoPZivRw3uJG8iDhRRd",
]);

apply_migrations_and_delete_data(setup.db.clone()).await;
index_seed_events(&setup, seeds.iter().collect_vec()).await;

// Fetch first page with limit=1
let request = api::GetAssetChanges {
asset_types: vec![AssetCategory::NFT],
limit: Some(1),
after_slot: None,
after: None,
};
let page1 = setup.das_api.get_asset_changes(request).await.unwrap();
assert_eq!(page1.items.len(), 1);
assert!(page1.after.is_some());

// Fetch second page using cursor
let request = api::GetAssetChanges {
asset_types: vec![AssetCategory::NFT],
limit: Some(1),
after: page1.after.clone(),
after_slot: None,
};
let page2 = setup.das_api.get_asset_changes(request).await.unwrap();
assert_eq!(page2.items.len(), 1);

// Pages should return different items
assert_ne!(page1.items[0].id, page2.items[0].id);

// Ordering: slot_updated should be non-decreasing across pages
assert!(page2.items[0].slot_updated >= page1.items[0].slot_updated);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Cursor pagination test may not fully verify all items are retrievable.

The pagination test seeds 3 NFTs but only fetches 2 pages (2 items). Consider adding a third page fetch to verify the final item and confirm after cursor becomes None when all items are exhausted.

Additionally, the slot ordering assertion on line 86 only checks >= between page 2 and page 1, but with 3 seeded items at potentially the same slot, this doesn't strongly validate the keyset ordering by (slot_updated, id).

♻️ Consider extending the test
     // Pages should return different items
     assert_ne!(page1.items[0].id, page2.items[0].id);

     // Ordering: slot_updated should be non-decreasing across pages
     assert!(page2.items[0].slot_updated >= page1.items[0].slot_updated);
+
+    // Fetch third page
+    let request = api::GetAssetChanges {
+        asset_types: vec![AssetCategory::NFT],
+        limit: Some(1),
+        after: page2.after.clone(),
+        after_slot: None,
+    };
+    let page3 = setup.das_api.get_asset_changes(request).await.unwrap();
+    assert_eq!(page3.items.len(), 1);
+    assert_ne!(page2.items[0].id, page3.items[0].id);
+    assert!(page3.items[0].slot_updated >= page2.items[0].slot_updated);
+
+    // Fourth page should be empty
+    let request = api::GetAssetChanges {
+        asset_types: vec![AssetCategory::NFT],
+        limit: Some(1),
+        after: page3.after.clone(),
+        after_slot: None,
+    };
+    let page4 = setup.das_api.get_asset_changes(request).await.unwrap();
+    assert!(page4.items.is_empty());
+    assert!(page4.after.is_none());
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[tokio::test]
#[serial]
#[named]
async fn test_get_asset_changes_cursor_pagination() {
let name = trim_test_name(function_name!());
let setup = TestSetup::new(name.clone()).await;
let seeds: Vec<SeedEvent> = seed_nfts([
"CMVuYDS9nTeujfTPJb8ik7CRhAqZv4DfjfdamFLkJgxE",
"HTKAVZZrDdyecCxzm3WEkCsG1GUmiqKm73PvngfuYRNK",
"2NqdYX6kJmMUoChnDXU2UrP9BsoPZivRw3uJG8iDhRRd",
]);
apply_migrations_and_delete_data(setup.db.clone()).await;
index_seed_events(&setup, seeds.iter().collect_vec()).await;
// Fetch first page with limit=1
let request = api::GetAssetChanges {
asset_types: vec![AssetCategory::NFT],
limit: Some(1),
after_slot: None,
after: None,
};
let page1 = setup.das_api.get_asset_changes(request).await.unwrap();
assert_eq!(page1.items.len(), 1);
assert!(page1.after.is_some());
// Fetch second page using cursor
let request = api::GetAssetChanges {
asset_types: vec![AssetCategory::NFT],
limit: Some(1),
after: page1.after.clone(),
after_slot: None,
};
let page2 = setup.das_api.get_asset_changes(request).await.unwrap();
assert_eq!(page2.items.len(), 1);
// Pages should return different items
assert_ne!(page1.items[0].id, page2.items[0].id);
// Ordering: slot_updated should be non-decreasing across pages
assert!(page2.items[0].slot_updated >= page1.items[0].slot_updated);
}
#[tokio::test]
#[serial]
#[named]
async fn test_get_asset_changes_cursor_pagination() {
let name = trim_test_name(function_name!());
let setup = TestSetup::new(name.clone()).await;
let seeds: Vec<SeedEvent> = seed_nfts([
"CMVuYDS9nTeujfTPJb8ik7CRhAqZv4DfjfdamFLkJgxE",
"HTKAVZZrDdyecCxzm3WEkCsG1GUmiqKm73PvngfuYRNK",
"2NqdYX6kJmMUoChnDXU2UrP9BsoPZivRw3uJG8iDhRRd",
]);
apply_migrations_and_delete_data(setup.db.clone()).await;
index_seed_events(&setup, seeds.iter().collect_vec()).await;
// Fetch first page with limit=1
let request = api::GetAssetChanges {
asset_types: vec![AssetCategory::NFT],
limit: Some(1),
after_slot: None,
after: None,
};
let page1 = setup.das_api.get_asset_changes(request).await.unwrap();
assert_eq!(page1.items.len(), 1);
assert!(page1.after.is_some());
// Fetch second page using cursor
let request = api::GetAssetChanges {
asset_types: vec![AssetCategory::NFT],
limit: Some(1),
after: page1.after.clone(),
after_slot: None,
};
let page2 = setup.das_api.get_asset_changes(request).await.unwrap();
assert_eq!(page2.items.len(), 1);
// Pages should return different items
assert_ne!(page1.items[0].id, page2.items[0].id);
// Ordering: slot_updated should be non-decreasing across pages
assert!(page2.items[0].slot_updated >= page1.items[0].slot_updated);
// Fetch third page
let request = api::GetAssetChanges {
asset_types: vec![AssetCategory::NFT],
limit: Some(1),
after: page2.after.clone(),
after_slot: None,
};
let page3 = setup.das_api.get_asset_changes(request).await.unwrap();
assert_eq!(page3.items.len(), 1);
assert_ne!(page2.items[0].id, page3.items[0].id);
assert!(page3.items[0].slot_updated >= page2.items[0].slot_updated);
// Fourth page should be empty
let request = api::GetAssetChanges {
asset_types: vec![AssetCategory::NFT],
limit: Some(1),
after: page3.after.clone(),
after_slot: None,
};
let page4 = setup.das_api.get_asset_changes(request).await.unwrap();
assert!(page4.items.is_empty());
assert!(page4.after.is_none());
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@integration_tests/tests/integration_tests/asset_changes_tests.rs` around
lines 45 - 87, The test test_get_asset_changes_cursor_pagination currently pages
only twice; add a third fetch using api::GetAssetChanges (AssetCategory::NFT,
limit: Some(1), after: page2.after) and assert it returns the final item
(items.len() == 1) and that page3.after is None to confirm exhaustion. Also
strengthen ordering checks by comparing the keyset tuple (slot_updated, id) from
page1.items[0], page2.items[0], and page3.items[0] to ensure non-decreasing
order across pages (i.e., (slot_updated, id) <= next page's tuple) rather than
only comparing slot_updated.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
integration_tests/tests/integration_tests/asset_changes_tests.rs (1)

82-103: ⚠️ Potential issue | 🟡 Minor

Add an explicit exhaustion fetch after page3.

The test still doesn’t verify terminal pagination behavior. Please fetch one more page with after: page3.after and assert empty items plus after.is_none().

♻️ Suggested test extension
     let page3 = setup.das_api.get_asset_changes(request).await.unwrap();
     assert_eq!(page3.items.len(), 1);
@@
     assert!(key1 <= key2, "page1 -> page2 ordering violated");
     assert!(key2 <= key3, "page2 -> page3 ordering violated");
+
+    // Fetch fourth page to verify exhaustion
+    let request = api::GetAssetChanges {
+        asset_types: vec![AssetCategory::NFT],
+        limit: Some(1),
+        after: page3.after.clone(),
+        after_slot: None,
+    };
+    let page4 = setup.das_api.get_asset_changes(request).await.unwrap();
+    assert!(page4.items.is_empty());
+    assert!(page4.after.is_none());
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@integration_tests/tests/integration_tests/asset_changes_tests.rs` around
lines 82 - 103, Add an explicit exhaustion check after the existing page3 fetch:
call setup.das_api.get_asset_changes with GetAssetChanges { asset_types:
vec![AssetCategory::NFT], limit: Some(1), after: page3.after.clone(),
after_slot: None } (store result as page4), then assert page4.items.is_empty()
and assert!(page4.after.is_none()) to verify terminal pagination behavior; place
this right after the existing page3 ordering assertions in the same test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@digital_asset_types/src/dao/scopes/asset.rs`:
- Around line 879-886: The lateral subquery that selects collection (the SELECT
ag.group_value FROM asset_grouping ag ... LIMIT 1 as lat) is non-deterministic;
modify that subquery to explicitly ORDER BY the freshness/timestamp columns
before LIMIT 1 (e.g., ORDER BY ag.updated_at DESC NULLS LAST, ag.id DESC) so the
newest grouping row is chosen deterministically; update the lateral SELECT that
produces lat to include this ORDER BY.
- Around line 887-893: The LEFT JOIN LATERAL selecting the creator from
asset_creators (alias ac) currently uses "ac.verified = true" and thus omits
legacy verified IS NULL rows and lacks stale-row filtering; update the WHERE
clause in this lateral subquery to use the same predicate used for
current-creator semantics (the predicate implemented around lines 607–637) so it
includes legacy verified IS NULL cases and applies the stale-row filter, then
keep the ORDER BY ac.position ASC LIMIT 1 to pick the first by position.

---

Duplicate comments:
In `@integration_tests/tests/integration_tests/asset_changes_tests.rs`:
- Around line 82-103: Add an explicit exhaustion check after the existing page3
fetch: call setup.das_api.get_asset_changes with GetAssetChanges { asset_types:
vec![AssetCategory::NFT], limit: Some(1), after: page3.after.clone(),
after_slot: None } (store result as page4), then assert page4.items.is_empty()
and assert!(page4.after.is_none()) to verify terminal pagination behavior; place
this right after the existing page3 ordering assertions in the same test.

ℹ️ Review info

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2c35ae3 and 27b825c.

📒 Files selected for processing (2)
  • digital_asset_types/src/dao/scopes/asset.rs
  • integration_tests/tests/integration_tests/asset_changes_tests.rs

Comment on lines +879 to +886
LEFT JOIN LATERAL (
SELECT ag.group_value
FROM asset_grouping ag
WHERE ag.asset_id = a.id
AND ag.group_key = 'collection'
AND (ag.verified = true OR ag.verified IS NULL)
LIMIT 1
) lat ON true
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

collection lateral join is nondeterministic with LIMIT 1 alone.

Without ORDER BY, Postgres can return any matching grouping row, so collection may flap when multiple rows exist for an asset. Please add deterministic freshness ordering before LIMIT 1.

Based on learnings: stale group rows may remain after upserts, so this join must explicitly select the newest row deterministically.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@digital_asset_types/src/dao/scopes/asset.rs` around lines 879 - 886, The
lateral subquery that selects collection (the SELECT ag.group_value FROM
asset_grouping ag ... LIMIT 1 as lat) is non-deterministic; modify that subquery
to explicitly ORDER BY the freshness/timestamp columns before LIMIT 1 (e.g.,
ORDER BY ag.updated_at DESC NULLS LAST, ag.id DESC) so the newest grouping row
is chosen deterministically; update the lateral SELECT that produces lat to
include this ORDER BY.

Comment on lines +887 to +893
LEFT JOIN LATERAL (
SELECT ac.creator
FROM asset_creators ac
WHERE ac.asset_id = a.id AND ac.verified = true
ORDER BY ac.position ASC
LIMIT 1
) lat_cr ON true
Copy link

@coderabbitai coderabbitai bot Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Creator selection can return stale or legacy-missing data.

This subquery ignores stale-row filtering and excludes legacy verified IS NULL rows. Align it with current-creator semantics (see Line 607-637) before picking first by position.

🛠️ Suggested SQL adjustment (minimum fix)
            LEFT JOIN LATERAL (
                SELECT ac.creator
                FROM asset_creators ac
-               WHERE ac.asset_id = a.id AND ac.verified = true
-               ORDER BY ac.position ASC
+               WHERE ac.asset_id = a.id
+                 AND (ac.verified = true OR ac.verified IS NULL)
+               ORDER BY ac.slot_updated DESC, ac.position ASC
                LIMIT 1
            ) lat_cr ON true
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
LEFT JOIN LATERAL (
SELECT ac.creator
FROM asset_creators ac
WHERE ac.asset_id = a.id AND ac.verified = true
ORDER BY ac.position ASC
LIMIT 1
) lat_cr ON true
LEFT JOIN LATERAL (
SELECT ac.creator
FROM asset_creators ac
WHERE ac.asset_id = a.id
AND (ac.verified = true OR ac.verified IS NULL)
ORDER BY ac.slot_updated DESC, ac.position ASC
LIMIT 1
) lat_cr ON true
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@digital_asset_types/src/dao/scopes/asset.rs` around lines 887 - 893, The LEFT
JOIN LATERAL selecting the creator from asset_creators (alias ac) currently uses
"ac.verified = true" and thus omits legacy verified IS NULL rows and lacks
stale-row filtering; update the WHERE clause in this lateral subquery to use the
same predicate used for current-creator semantics (the predicate implemented
around lines 607–637) so it includes legacy verified IS NULL cases and applies
the stale-row filter, then keep the ORDER BY ac.position ASC LIMIT 1 to pick the
first by position.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this applies here. asset_creators.verified is a non-nullable bool column (unlike asset_grouping.verified which is Option), so the IS NULL guard would be dead code. And adding slot_updated DESC before position ASC would change the semantics to pick the most-recently-updated creator rather than the first-by-position, which is inconsistent with how creator ordering works elsewhere in the codebase (e.g., get_related_for_assets and the single-asset query both order purely by position ASC). But please lmk.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find the asset_creators entity/model definition
echo "=== Looking for asset_creators Model definition ==="
rg -A 20 "pub struct.*asset_creators.*Model" --type rust

echo -e "\n=== Looking for asset_creators migration ==="
rg -A 30 "create_table.*asset_creators" --type rust

echo -e "\n=== Looking for asset_creators verified column definition ==="
rg "verified.*bool" --type rust -C 3 | grep -A 3 -B 3 "asset_creators"

Length of output: 1243


🏁 Script executed:

#!/bin/bash
# Check the filter_out_stale_creators function implementation
echo "=== filter_out_stale_creators implementation ==="
rg -A 30 "fn filter_out_stale_creators" --type rust

echo -e "\n=== Check how get_related_for_assets uses creators ==="
rg -B 5 -A 10 "asset_creators::Entity::find" digital_asset_types/src/dao/scopes/asset.rs | head -40

echo -e "\n=== Check ORDER BY in get_by_id for creators ==="
rg -B 3 -A 5 "asset_creators::Entity::find.*filter.*asset\.id" --type rust

Length of output: 4261


@joenoon You're absolutely right on all points, and I apologize for the incorrect suggestion. I've now verified:

  1. asset_creators.verified is indeed a non-nullable bool column, so the IS NULL guard would be dead code.

  2. ✅ Adding slot_updated DESC before position ASC would incorrectly change the semantics to pick the most-recently-updated creator rather than the first-by-position.

  3. ✅ Both get_related_for_assets and get_by_id fetch creators ordered purely by position ASC, then apply filter_out_stale_creators() afterward.

My suggestion was misguided. I was concerned that the LATERAL subquery doesn't apply the stale-creator filtering logic from filter_out_stale_creators() (which filters by max slot_updated and max seq), but:

  • Replicating that complex filtering in SQL would be non-trivial
  • For a high-throughput changes feed, keeping the query simple is likely the right tradeoff
  • This is a design decision rather than a correctness issue

The current implementation (verified = true, ordered by position ASC, LIMIT 1) is consistent and reasonable. Thank you for the clarification!


✏️ Learnings added
Learnt from: joenoon
Repo: metaplex-foundation/digital-asset-rpc-infrastructure PR: 269
File: digital_asset_types/src/dao/scopes/asset.rs:887-893
Timestamp: 2026-02-27T23:26:56.117Z
Learning: In the digital-asset-rpc-infrastructure codebase, asset_creators.verified is a non-nullable bool column, not an Option<bool>, so predicates like "verified IS NULL" are dead code and should not be suggested.

Learnt from: joenoon
Repo: metaplex-foundation/digital-asset-rpc-infrastructure PR: 269
File: digital_asset_types/src/dao/scopes/asset.rs:887-893
Timestamp: 2026-02-27T23:26:56.118Z
Learning: In the digital-asset-rpc-infrastructure codebase, stale creator filtering is performed by the filter_out_stale_creators() function which filters by max slot_updated and max seq values. The existing query patterns (get_related_for_assets, get_by_id) order creators purely by position ASC, not by slot_updated DESC, to ensure deterministic first-by-position semantics.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant