Skip to content

Add das-grpc-ingest crate#240

Open
Nagaprasadvr wants to merge 83 commits intometaplex-foundation:mainfrom
rpcpool:grpc-cleanup
Open

Add das-grpc-ingest crate#240
Nagaprasadvr wants to merge 83 commits intometaplex-foundation:mainfrom
rpcpool:grpc-cleanup

Conversation

@Nagaprasadvr
Copy link
Collaborator

@Nagaprasadvr Nagaprasadvr commented Feb 10, 2025

Goal

Reduce the operational cost and complexity of running DAS by switching to Dragonmouth grpc stream for account and transaction updates.

Upgrade the ingestion engine by refactoring configuration, workers, and metrics.

Approach

Create a new crate named das-grpc-ingest which has 3 primary subcommands.

grpc2redis - connects to Dragonmouth and pushes relevant events into a redis queue
ingest - subscribes to redis streams, spawns workers, and processes events using program_transformers crate.

closes #83

Next Steps

  • Remove statsd as a dependency of program_transforms. Make the metrics backend the responsibility of the ingest runtime so prometheus can be used for grpc-ingest but nft_ingester can continue to use statsd.
  • Organize crates renaming nft_ingester to das-plerkle-ingest

Diagram

DAS API Diagram

fanatid and others added 30 commits April 16, 2024 11:47
…the main thread keep a chache of seen events so that it doesnt push to ingest if seen by another connections
…flow. also pull in backfill to be used in ingest
…est stream which handle reading messages and handling messages.
@Nagaprasadvr Nagaprasadvr requested a review from danenbm February 17, 2025 09:35
@Nagaprasadvr Nagaprasadvr marked this pull request as ready for review February 20, 2025 06:28
@kespinola kespinola changed the title cleanup grpc-ingest branch and limit external non-dep code changes Add das-grpc-ingest crate Feb 24, 2025
kespinola
kespinola previously approved these changes Feb 24, 2025
@kespinola kespinola mentioned this pull request Feb 24, 2025
cargo tree
git checkout Cargo.lock
cargo tree --frozen
cargo tree
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/metaplex-foundation/digital-asset-rpc-infrastructure/pull/240/files#diff-2e9d962a08321605940b5a657135052fbcef87b5e360662bb527c96d9a615542R144

When we update yellowstone-grpc to later package we start getting dependency conflicts. We will be looking at resolving this but would like to do in a follow up.

vergen = "8.2.1"
wasi = "0.7.0"
wasm-bindgen = "0.2.83"
yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.15.1+solana.1.18.22" } # tag is geyser plugin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kespinola told me these were being moved to https://crates.io/crates/yellowstone-grpc-kafka so could we update these to use that crate?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It didn't workout with dependencies, so we are doing a new release which resolves this dep issue and update the pr soon

Copy link
Contributor

@danenbm danenbm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for one comment on the crate imports

@coderabbitai
Copy link

coderabbitai bot commented Apr 25, 2025

Summary by CodeRabbit

  • New Features

    • Introduced a new gRPC ingestion service (grpc-ingest) for processing Solana change events via Yellowstone GRPC endpoints and integrating with Redis and Postgres.
    • Added concurrent, retry-capable downloading and storage of digital asset metadata JSON.
    • Integrated Prometheus metrics for real-time monitoring, with a new Prometheus service in Docker Compose.
    • Added comprehensive configuration and setup documentation for new ingestion components.
  • Improvements

    • Expanded dependency management and workspace structure to support new ingestion features.
    • Enhanced modularity by centralizing shared types and logic in a new core library.
    • Updated asset data handling to better signal when metadata downloads are needed.
  • Bug Fixes

    • Minor adjustments to workflow and Dockerfiles to support new build and runtime requirements.
  • Documentation

    • Added and updated READMEs and example configuration files for easier onboarding and setup.
  • Refactor

    • Refactored codebase to move shared types to a core library and streamline public API exposure.

Summary by CodeRabbit

  • New Features

    • Introduced a new grpc-ingest component for ingesting Solana change events via Yellowstone GRPC, with developer setup and configuration guides.
    • Added concurrent and retry-capable JSON metadata downloader and ingestion system for digital assets.
    • Added Prometheus metrics server and monitoring endpoints for observability.
    • Added new Prometheus service and configuration to docker-compose for enhanced monitoring.
  • Enhancements

    • Expanded workspace and core dependencies to support gRPC, telemetry, serialization, and cryptography.
    • Improved modularity by moving and reusing metadata download logic across components.
    • Updated asset data upsert logic to better trigger metadata downloads.
  • Documentation

    • Added comprehensive READMEs and example configuration files for new services and components.
  • Bug Fixes

    • Improved error handling and logging in ingestion and download workflows.
  • Chores

    • Updated Dockerfiles and .gitignore to support new grpc-ingest directory and configuration management.

Walkthrough

This change introduces a new gRPC-based indexer component (grpc-ingest) to the project, adding support for ingesting Solana change events from Yellowstone gRPC endpoints and pushing them into Redis and Postgres. The update includes new configuration files, Docker and Compose integration, Prometheus monitoring setup, and a comprehensive Rust implementation for the ingestion pipeline, covering gRPC subscription, Redis stream handling, concurrent metadata JSON downloading, and database updating. Existing code is refactored to integrate with the new ingestion model, and dependencies are updated to support gRPC, telemetry, and related utilities. Documentation is updated to reflect the new architecture and usage.

Changes

File(s) / Path(s) Change Summary
.github/workflows/test.yml Adjusted workflow trigger quoting and removed --frozen from cargo tree command.
Builder.Dockerfile, Proxy.Dockerfile Added grpc-ingest directory to Docker build context.
Cargo.toml Added grpc-ingest as a workspace member; introduced new dependencies for gRPC, telemetry, and utilities.
core/Cargo.toml Added and reorganized dependencies; no code changes.
core/src/lib.rs Declared and re-exported new metadata_json module.
core/src/metadata_json.rs New module for concurrent metadata JSON downloading, retrying, and DB updating.
docker-compose.yaml Added Prometheus service for monitoring.
grpc-ingest/* (new directory and contents) Introduced new gRPC ingest component: configs, build script, main logic, modules for config, gRPC, ingestion, Redis, Prometheus, tracing, utilities, and version info.
nft_ingester/Cargo.toml Added das-core as a dependency.
nft_ingester/src/tasks/common/mod.rs Changed import source for DownloadMetadataInfo and DownloadMetadataNotifier to das_core.
ops/src/bubblegum/README.md Removed trailing newline.
program_transformers/Cargo.toml Added das-core and serde dependencies; reformatted features.
program_transformers/src/bubblegum/db.rs Refactored upsert_asset_data to return Option<DownloadMetadataInfo>; removed parameters for metadata and reindex.
program_transformers/src/bubblegum/mint_v1.rs Added new mint_v1 function for Bubblegum minting, returning Option<DownloadMetadataInfo>.
program_transformers/src/bubblegum/update_metadata.rs Updated call to upsert_asset_data to match new signature.
program_transformers/src/lib.rs Removed local DownloadMetadataInfo and DownloadMetadataNotifier in favor of imports from das_core; updated derives.
program_transformers/src/token_extensions/mod.rs Refactored return of DownloadMetadataInfo to use constructor.
prometheus-config.yaml Added Prometheus scrape configuration for local endpoints.
README.md Added section describing the new grpc-ingest component.

Sequence Diagram(s)

sequenceDiagram
    participant YellowstoneGRPC as Yellowstone gRPC Endpoint
    participant GrpcIngest as grpc-ingest (grpc2redis)
    participant Redis as Redis
    participant Ingester as grpc-ingest (ingester)
    participant Postgres as Postgres

    YellowstoneGRPC->>GrpcIngest: Stream account/tx updates via gRPC
    GrpcIngest->>Redis: Push updates to Redis streams

    Ingester->>Redis: Read updates from Redis streams
    Ingester->>Postgres: Process and store data (accounts, tx, snapshots)
    Ingester->>YellowstoneGRPC: (optional) Fetch snapshots or missing blocks

    loop Metadata Download
        Ingester->>External: Download metadata JSON from URI
        Ingester->>Postgres: Update asset metadata in DB
    end
Loading

Assessment against linked issues

Objective (Issue #) Addressed Explanation
Implement a gRPC-based indexer for Solana events using Yellowstone gRPC (account/tx updates, multi-endpoint, etc.) [#83]
Support concurrent ingestion, Redis streaming, and Postgres integration for indexer [#83]
Add Prometheus monitoring and configuration for indexer components [#83]
Update documentation and configs to reflect new gRPC ingestion architecture [#83]

Suggested reviewers

  • pmantica11
  • NicolasPennie
  • andrii-kl

📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Lite

📥 Commits

Reviewing files that changed from the base of the PR and between 74d0481 and b9e3ebe.

📒 Files selected for processing (1)
  • program_transformers/src/bubblegum/mint.rs (1 hunks)
🔇 Additional comments (4)
program_transformers/src/bubblegum/mint.rs (4)

28-28: Import statement cleaned up appropriately.

The import statement for sea_orm has been simplified to only include the needed traits ConnectionTrait and TransactionTrait, removing unnecessary imports. This aligns with best practices for keeping imports focused on what's actually used in the file.


89-101: Function call signature matches updated implementation.

The upsert_asset_data call has been updated to match the revised function signature mentioned in the AI summary, which now returns Option<DownloadMetadataInfo>. The function is properly used in the transaction flow.


190-196: Appropriate error handling for empty URIs.

Good defensive programming practice to check for empty URIs before attempting to download metadata. The warning log provides useful context for debugging.


198-198: Clean implementation of return value.

The function now properly returns Some(DownloadMetadataInfo) with the asset ID and URI, which aligns with the changes described in the AI summary. This allows downstream components to process metadata downloads when appropriate.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 36

🔭 Outside diff range comments (2)
grpc-ingest/config-monitor.example.yml (1)

1-21: 🧹 Nitpick (assertive)

Configuration file looks good, consider adding documentation for only_trees parameter.

The configuration file is well-structured with clear comments for each section. However, the only_trees: null parameter lacks explanation about its purpose and valid values.

 # bubblegum merkle tree configuration
 bubblegum:
-  only_trees: null
+  # set to an array of tree IDs to monitor only specific trees, or null to monitor all trees
+  only_trees: null
grpc-ingest/config-ingester.example.yml (1)

74-75: 🧹 Nitpick (assertive)

Remove excessive blank lines

Remove the excessive blank line at the end of the file to fix the YAML linting error.

   # pipeline max idle time in milliseconds
   pipeline_max_idle_ms: 150
-
🧰 Tools
🪛 YAMLlint (1.35.1)

[error] 74-74: too many blank lines

(1 > 0) (empty-lines)

♻️ Duplicate comments (4)
.github/workflows/test.yml (1)

38-42: Removed --frozen flag from cargo tree command

Removing the --frozen flag allows the workflow to update and download missing dependencies, which aligns with handling the new dependency requirements.

When we update yellowstone-grpc to later package we start getting dependency conflicts. We will be looking at resolving this but would like to do in a follow up.

README.md (1)

18-20: 🧹 Nitpick (assertive)

Update section to fix grammar and formatting for consistency.

The GRPC-INGEST section has some formatting and grammatical issues:

  1. Inconsistent capitalization: "GRPC-INGEST" vs "grpc ingester"
  2. Missing comma in the second sentence
  3. "are able to" can be simplified to "can"
-#### GRPC-INGEST [/grpc-ingest/README.md](/grpc-ingest/README.md)
-
-The grpc ingester allows the DAS index stack to receive solana change events from a Yellowstone GRPC endpoint. All the largest RPC providers offer this API so you are able to use a shared event stream instead of running a dedicated validator like you have to with the plerkle geyser plugin.
+#### gRPC-INGEST [/grpc-ingest/README.md](/grpc-ingest/README.md)
+
+The gRPC ingester allows the DAS index stack to receive Solana change events from a Yellowstone gRPC endpoint. All the largest RPC providers offer this API, so you can use a shared event stream instead of running a dedicated validator like you have to with the plerkle geyser plugin.

Note: According to previous review comments, consider moving this content to the gRPC-ingest README instead of keeping it in the main README.

🧰 Tools
🪛 LanguageTool

[uncategorized] ~20-~20: A comma might be missing here.
Context: ...ll the largest RPC providers offer this API so you are able to use a shared event s...

(AI_EN_LECTOR_MISSING_PUNCTUATION_COMMA)


[style] ~20-~20: As a shorter alternative for ‘able to’, consider using “can”.
Context: ...est RPC providers offer this API so you are able to use a shared event stream instead of ru...

(BE_ABLE_TO)

docker-compose.yaml (1)

1-133: Missing grpc-ingest in Dockerfiles

It appears that grpc-ingest needs to be added to some Dockerfiles as mentioned in a previous review.

#!/bin/bash
# Check if grpc-ingest is properly included in the Dockerfiles

echo "Checking Dockerfiles for grpc-ingest inclusion:"
grep -l "COPY grpc-ingest" *.Dockerfile || echo "grpc-ingest not found in any Dockerfile"
grpc-ingest/src/version.rs (1)

14-22: Consider consistent naming for environment variable fields.

Based on previous review comments, the field names rustc and buildts might have been intended to be named differently, but keep them as is if renaming causes errors.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 93f9c8b and 74d0481.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (36)
  • .github/workflows/test.yml (2 hunks)
  • Builder.Dockerfile (1 hunks)
  • Cargo.toml (7 hunks)
  • Proxy.Dockerfile (1 hunks)
  • README.md (1 hunks)
  • core/Cargo.toml (1 hunks)
  • core/src/lib.rs (1 hunks)
  • core/src/metadata_json.rs (1 hunks)
  • docker-compose.yaml (1 hunks)
  • grpc-ingest/.gitignore (1 hunks)
  • grpc-ingest/Cargo.toml (1 hunks)
  • grpc-ingest/README.md (1 hunks)
  • grpc-ingest/build.rs (1 hunks)
  • grpc-ingest/config-grpc2redis.example.yml (1 hunks)
  • grpc-ingest/config-ingester.example.yml (1 hunks)
  • grpc-ingest/config-monitor.example.yml (1 hunks)
  • grpc-ingest/src/config.rs (1 hunks)
  • grpc-ingest/src/grpc.rs (1 hunks)
  • grpc-ingest/src/ingester.rs (1 hunks)
  • grpc-ingest/src/main.rs (1 hunks)
  • grpc-ingest/src/postgres.rs (1 hunks)
  • grpc-ingest/src/prom.rs (1 hunks)
  • grpc-ingest/src/redis.rs (1 hunks)
  • grpc-ingest/src/tracing.rs (1 hunks)
  • grpc-ingest/src/util.rs (1 hunks)
  • grpc-ingest/src/version.rs (1 hunks)
  • nft_ingester/Cargo.toml (1 hunks)
  • nft_ingester/src/tasks/common/mod.rs (1 hunks)
  • ops/src/bubblegum/README.md (1 hunks)
  • program_transformers/Cargo.toml (1 hunks)
  • program_transformers/src/bubblegum/db.rs (3 hunks)
  • program_transformers/src/bubblegum/mint_v1.rs (1 hunks)
  • program_transformers/src/bubblegum/update_metadata.rs (1 hunks)
  • program_transformers/src/lib.rs (2 hunks)
  • program_transformers/src/token_extensions/mod.rs (1 hunks)
  • prometheus-config.yaml (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
program_transformers/src/bubblegum/db.rs (1)
core/src/metadata_json.rs (3)
  • new (43-49)
  • new (53-58)
  • new (276-278)
grpc-ingest/src/grpc.rs (4)
grpc-ingest/src/ingester.rs (5)
  • config (67-70)
  • futures (263-271)
  • connection (72-75)
  • new (37-45)
  • start (77-178)
grpc-ingest/src/redis.rs (18)
  • config (376-379)
  • redis (325-329)
  • tokio (424-424)
  • tokio (425-425)
  • tokio (428-428)
  • tokio (429-429)
  • connection (381-384)
  • new (231-236)
  • new (248-250)
  • new (278-280)
  • new (313-315)
  • clone (240-242)
  • clone (270-272)
  • clone (301-303)
  • default (649-654)
  • start (399-639)
  • flush (668-678)
  • xadd_maxlen (658-666)
grpc-ingest/src/prom.rs (3)
  • grpc_tasks_total_dec (265-267)
  • grpc_tasks_total_inc (261-263)
  • redis_xadd_status_inc (208-216)
grpc-ingest/src/util.rs (1)
  • create_shutdown (7-19)
🪛 markdownlint-cli2 (0.17.2)
ops/src/bubblegum/README.md

86-86: Files should end with a single newline character
null

(MD047, single-trailing-newline)

grpc-ingest/README.md

1-1: First line in a file should be a top-level heading
null

(MD041, first-line-heading, first-line-h1)


15-15: Bare URL used
null

(MD034, no-bare-urls)


36-36: Trailing punctuation in heading
Punctuation: ':'

(MD026, no-trailing-punctuation)


38-38: Bare URL used
null

(MD034, no-bare-urls)


54-54: Emphasis used instead of a heading
null

(MD036, no-emphasis-as-heading)

🪛 LanguageTool
README.md

[uncategorized] ~20-~20: A comma might be missing here.
Context: ...ll the largest RPC providers offer this API so you are able to use a shared event s...

(AI_EN_LECTOR_MISSING_PUNCTUATION_COMMA)


[style] ~20-~20: As a shorter alternative for ‘able to’, consider using “can”.
Context: ...est RPC providers offer this API so you are able to use a shared event stream instead of ru...

(BE_ABLE_TO)

grpc-ingest/README.md

[grammar] ~24-~24: The word “setup” is a noun. The verb is spelled with a space.
Context: ...hese files and modify them as needed to setup the project. ### Run grpc2redis serv...

(NOUN_VERB_CONFUSION)

🪛 YAMLlint (1.35.1)
grpc-ingest/config-ingester.example.yml

[error] 74-74: too many blank lines

(1 > 0) (empty-lines)

⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: build-api (ubuntu-22.04)
🔇 Additional comments (40)
prometheus-config.yaml (2)

1-4: Validate aggressive global scrape/evaluation intervals
You’ve set scrape_interval: 1s and evaluation_interval: 5s, which can be very resource-intensive. Confirm that your endpoints and network can handle the 1s scrape cadence without dropping metrics or overwhelming Prometheus.


5-14: Verify scrape targets and hostname resolution
The static targets use host.docker.internal, which relies on Docker DNS resolution. Ensure this will resolve correctly in all deployment environments (e.g., Linux hosts vs. CI runners), or consider using network aliases or service names defined in Docker Compose.

nft_ingester/src/tasks/common/mod.rs (1)

5-5:

Details

✅ Verification successful

Verify updated import and workspace dependencies
You’ve switched DownloadMetadataInfo and DownloadMetadataNotifier from program_transformers to das_core. Please ensure that das-core is added to nft_ingester’s Cargo.toml and that all old references to program_transformers are removed to prevent build errors.


🏁 Script executed:

#!/bin/bash
# Check for das-core in nft_ingester Cargo.toml
rg "das-core" -n nft_ingester/Cargo.toml
# Ensure no program_transformers imports remain
rg "program_transformers" -n nft_ingester/src/tasks/common/mod.rs

Length of output: 141


Dependencies Verified and Cleaned Up

  • Confirmed das-core = { workspace = true } is present at line 17 in nft_ingester/Cargo.toml.
  • No remaining program_transformers imports in nft_ingester/src/tasks/common/mod.rs.

All set—no further changes needed.

Builder.Dockerfile (1)

12-12:

Details

✅ Verification successful

Ensure new crate is included in the workspace
You’ve added COPY grpc-ingest /rust/grpc-ingest, but the build will fail if the root Cargo.toml doesn’t list grpc-ingest as a workspace member. Please verify that grpc-ingest is present under [workspace].members.


🏁 Script executed:

#!/bin/bash
# Confirm grpc-ingest is a workspace member
rg -n "members\s*=\s*\[.*grpc-ingest" -n Cargo.toml

Length of output: 53


I couldn’t confirm grpc-ingest in the workspace with the previous query. Let’s search all Cargo.toml files and inspect the workspace section:


🏁 Script executed:

#!/bin/bash
# Search for grpc-ingest references in all Cargo.toml
rg -n "grpc-ingest" --glob "Cargo.toml" .

# Show the workspace section (members list) from the root Cargo.toml for context
rg -n "^\[workspace\]" -A10 --glob "Cargo.toml" .

Length of output: 519


Verified grpc-ingest in workspace
grpc-ingest is already listed under [workspace].members in Cargo.toml (line 12). No further action needed.

core/src/lib.rs (2)

1-3: Module declaration looks good

The addition of the metadata_json module follows the consistent pattern of module declarations in this file.


7-9: Appropriate re-export pattern

The export of the metadata_json module's public items follows the same pattern used for other modules.

Proxy.Dockerfile (1)

9-11: Dockerfile update is correctly implemented

The addition of the new grpc-ingest directory to the Docker build context correctly aligns with the PR's goal of introducing a new gRPC-based indexer component.

nft_ingester/Cargo.toml (1)

16-18: Dependency addition looks good

Adding the das-core workspace dependency is appropriate for enabling the use of shared types that were moved from program_transformers to das-core.

.github/workflows/test.yml (1)

10-12: String quoting style change

This is a minor style change from double-quotes to single-quotes that doesn't affect functionality.

program_transformers/src/bubblegum/update_metadata.rs (3)

26-27: Sea-orm import simplified

Removal of the JsonValue import is consistent with the refactoring of the upsert_asset_data function which now handles metadata field setting internally.


117-129: Updated function call signature

The upsert_asset_data call no longer includes the metadata and reindex parameters, which is consistent with the function signature changes. The function now internally sets the metadata field to "processing" and reindex to true.


190-191: Proper use of the new DownloadMetadataInfo constructor

The code correctly uses the new DownloadMetadataInfo::new constructor method to create the returned value, consistent with the type being moved to the das-core crate.

program_transformers/Cargo.toml (2)

13-17: Dependencies look good.

The new dependencies and multiline formatting of features align well with the broader workspace dependency expansions in the project.


23-23: Added serde dependency.

Adding serde as a workspace dependency is appropriate for supporting serialization needs across components.

grpc-ingest/src/util.rs (1)

1-19: Well-implemented shutdown signal handler.

The create_shutdown function provides a clean, idiomatic way to handle UNIX termination signals in an asynchronous context using Tokio. This implementation will allow graceful shutdown across the various services within the gRPC ingestion pipeline.

program_transformers/src/token_extensions/mod.rs (1)

276-279: Constructor method usage looks good

Changed from direct struct initialization to using the DownloadMetadataInfo::new() constructor method, which aligns with the refactoring where this type was moved to the das_core crate.

grpc-ingest/src/postgres.rs (2)

12-22: Pool creation function implements good practices

The PostgreSQL connection pool creation is well-implemented with proper error handling and configuration parameter usage.


24-27: Metric reporting is straightforward

Good implementation of Prometheus metrics for monitoring pool health. Tracking both total and idle connections provides valuable operational insights.

core/Cargo.toml (1)

11-39: Dependencies appropriately updated for new functionality

The added dependencies support the new gRPC ingestion, Redis stream handling, and metadata downloading capabilities introduced in this PR.

docker-compose.yaml (1)

124-133: Prometheus service added for metrics collection

The Prometheus service is properly configured to collect metrics from the new gRPC ingestion components. The host gateway configuration enables it to communicate with services on the host machine.

grpc-ingest/build.rs (4)

1-7: Well structured build script with proper error handling.

The build script correctly uses vergen to emit build and Rust compiler metadata, with proper error handling via anyhow.


8-13: Good approach for cleaner Git version formatting.

Overriding the default vergen git version with git_version!() provides a cleaner and more readable version string.


28-38: Efficient version extraction implementation.

The get_pkg_version function efficiently handles potential duplicate versions by using a HashSet for deduplication before joining them as a comma-separated string.


14-26:

Details

✅ Verification successful

Verify the relative path assumption for Cargo.lock.

The script loads the Cargo.lock file from a fixed relative path "../Cargo.lock", which assumes this script is always run from within the grpc-ingest directory.


🏁 Script executed:

#!/bin/bash
# Check if Cargo.lock exists at the expected relative path
if [ -f "../Cargo.lock" ]; then
  echo "Cargo.lock exists at expected path"
else
  echo "ERROR: Cargo.lock not found at ../Cargo.lock"
  # Check where Cargo.lock actually exists
  find .. -name "Cargo.lock" -maxdepth 2
fi

Length of output: 182


To accurately verify this, let’s simulate the build script’s working directory. We need to cd into the grpc-ingest crate folder before checking for ../Cargo.lock.


🏁 Script executed:

#!/bin/bash
# Move into the crate directory
cd grpc-ingest

# Now check if Cargo.lock exists one level up
if [ -f "../Cargo.lock" ]; then
  echo "✅ Cargo.lock found at ../Cargo.lock relative to grpc-ingest"
else
  echo "❌ Cargo.lock not found at ../Cargo.lock relative to grpc-ingest"
  echo "Searching for any Cargo.lock within two levels up:"
  find .. -maxdepth 2 -type f -name "Cargo.lock"
fi

Length of output: 172


No changes needed — Cargo.lock path is valid

We verified by running the build script from the grpc-ingest directory that ../Cargo.lock correctly points to the root Cargo.lock. The relative‐path assumption is safe.

grpc-ingest/src/version.rs (1)

1-13: Clean Version struct definition with appropriate traits.

The Version struct is well-structured with relevant fields and properly derives Debug and Serialize traits.

grpc-ingest/Cargo.toml (3)

1-7: Good use of workspace inheritance for package metadata.

The package metadata appropriately uses workspace inheritance for shared configuration values.


8-57: Comprehensive dependency list with appropriate features.

The dependencies are well-specified with necessary features, particularly for sqlx, tokio, and redis.


58-62: Appropriate build dependencies for version information.

The build dependencies align well with the functionality in build.rs for version extraction and information generation.

grpc-ingest/config-grpc2redis.example.yml (4)

3-4: Consider security implications of Prometheus endpoint.

The Prometheus endpoint is configured to listen on all interfaces (0.0.0.0), which could expose metrics publicly. In production, consider restricting this to localhost or using authentication.


7-17: Update placeholder token value before deployment.

The example configuration contains a placeholder null value for x_token that needs to be replaced with an actual token for production use.


19-67: Well-structured subscription configurations with appropriate filters.

The subscription configurations are well-organized with appropriate stream settings and filters for different data types.


69-71: Update Redis URL for production deployments.

The example uses a local Redis URL that should be updated for production deployments to point to the actual Redis instance.

program_transformers/src/lib.rs (2)

23-23: LGTM! Good refactoring of shared components

The import of DownloadMetadataInfo and DownloadMetadataNotifier from the new das_core crate effectively centralizes these types, promoting code reuse across the codebase.


52-58: LGTM! Adding Deserialize derivation enables Redis ingestion

Adding the Deserialize trait to AccountInfo and TransactionInfo is a good addition that enables these structures to be deserialized from Redis streams in the new gRPC ingestion pipeline.

Also applies to: 60-67

program_transformers/src/bubblegum/db.rs (1)

497-506: rows_affected() can report 1 even when data is unchanged → duplicate download jobs

Postgres counts an ON CONFLICT … DO UPDATE as “1 row affected” even if the UPDATE sets every column to its current value.
The current logic therefore enqueues DownloadMetadataInfo repeatedly, causing unnecessary re-processing.

Two common mitigations:

  1. Add a WHERE clause that checks reindex IS NOT TRUE (or compares hashes) so the update is a true no-op.
  2. Use RETURNING xmax and treat xmax = 0 as “row was untouched”.
ON CONFLICT (...) DO UPDATE
    SET reindex = TRUE
    WHERE excluded.base_info_seq > asset_data.base_info_seq
RETURNING (xmax = 0) AS updated;

Then decide whether to push to the queue based on the updated flag.

Cargo.toml (2)

12-13: Workspace member addition looks good

grpc-ingest is properly declared as a workspace member, so the crate will participate in unified version resolution and the cargo workspace build graph.


144-146:

Details

❓ Verification inconclusive

Long-term stability risk when depending on git tags for yellowstone-grpc-*

Using git dependencies ties reproducible builds to the availability of the remote repository and prevents publishing to crates.io.
If possible:

  1. Depend on the crates once they are published, or
  2. Mirror the exact commit hash instead of a mutable tag to guard against force-pushes.

This will improve supply-chain security and build reproducibility.


Avoid Git tags for yellowstone-grpc-* in Cargo.toml

File: Cargo.toml (Lines 144–146)

Currently you’re pulling these crates directly from a Git tag:

yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.15.1+solana.1.18.22" }
yellowstone-grpc-proto  = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.15.1+solana.1.18.22" }
yellowstone-grpc-tools  = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.15.1+solana.1.18.22", default-features = false }

Long‐term builds that point to a mutable Git tag risk breakage if the tag is moved or the repo becomes unavailable, and you cannot publish a crate that has Git dependencies. To improve reproducibility and supply‐chain security:

  • If the crates are published on crates.io, depend on their published versions instead of the Git repo.
  • Otherwise, pin to a specific commit SHA rather than a named tag:
    yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc.git", rev = "abcdef1234567890abcdef" }

Please verify whether yellowstone-grpc-client, yellowstone-grpc-proto, and yellowstone-grpc-tools are available on crates.io; if not, switch to rev = "<commit-hash>" to guard against force-pushes.

program_transformers/src/bubblegum/mint_v1.rs (1)

200-206: Missing return Ok(None) path keeps the DB transaction but leaks the task counter

Early-returning when uri.is_empty() skips the final logging but keeps the previously incremented metrics in upstream code unchanged.
Verify that the caller properly decrements task counters in this branch; otherwise add an explicit decrement (or remove the earlier increment) to keep Prometheus gauges balanced.

grpc-ingest/src/redis.rs (1)

438-457: 🛠️ Refactor suggestion

Concurrency cap can be silently exceeded

tasks.len() >= config.max_concurrency { tasks.join_next().await; } removes
only one finished task, so if max_concurrency is N and N new
messages are delivered at once, the loop will schedule N+N-1 tasks before
tasks.len() drops below the limit.

Use a while loop or tasks.join_next().await; in a loop to keep the cap
strict:

if tasks.len() >= config.max_concurrency {
-    tasks.join_next().await;
+    while tasks.len() >= config.max_concurrency {
+        tasks.join_next().await;
+    }
}

Likely an incorrect or invalid review comment.

grpc-ingest/src/config.rs (1)

22-28: json5 parser selected only for .json extension

The loader always feeds JSON files through json5::from_str, even when the
extension is plain .json. While JSON5 is backward-compatible most of the
time, subtle incompatibilities (e.g. single quotes, trailing commas) might
slip into a file that is intended to be strict JSON.

Consider:

  • .json5json5::from_str
  • .jsonserde_json::from_str

to avoid surprising behaviour for users expecting standard JSON.

tree.succeeded | Count of completed tree crawl
tree.crawled | Time to crawl a tree
job.completed | Time to complete the job
job.completed | Time to complete the job No newline at end of file
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Add a single trailing newline
Static analysis (MD047) indicates the file should end with exactly one newline. Please restore the trailing newline after line 86 to satisfy the markdown lint rule.

🧰 Tools
🪛 markdownlint-cli2 (0.17.2)

86-86: Files should end with a single newline character
null

(MD047, single-trailing-newline)

Comment on lines +1 to +3
config-grpc2redis.yaml
config-ingester.yaml
config-monitor.yaml
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Consider using glob patterns for config files
Instead of listing each config file explicitly, you can simplify maintenance by using a glob, e.g.:

- config-grpc2redis.yaml
- config-ingester.yaml
- config-monitor.yaml
+ config-*.yaml

This will automatically ignore any future config-*.yaml files without manual updates.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
config-grpc2redis.yaml
config-ingester.yaml
config-monitor.yaml
config-*.yaml

serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
solana-sdk = { workspace = true } # only prom rn
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Clarify the solana-sdk dependency usage comment.

The comment "# only prom rn" suggests that solana-sdk is only used for Prometheus metrics currently. Consider documenting this more clearly or removing the comment if the dependency is fully utilized.

-solana-sdk = { workspace = true } # only prom rn
+solana-sdk = { workspace = true } # Currently only used for Prometheus metrics
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
solana-sdk = { workspace = true } # only prom rn
solana-sdk = { workspace = true } # Currently only used for Prometheus metrics


### Metrics

Both grpc2redis and ingester services expose prometheus metrics and can be accessed at `http://localhost:9090/metrics`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Use proper heading format for the metrics endpoint information

Use a proper heading format instead of emphasis for consistency.

-Both grpc2redis and ingester services expose prometheus metrics and can be accessed at `http://localhost:9090/metrics`
+Both grpc2redis and ingester services expose prometheus metrics and can be accessed at `http://localhost:9090/metrics`.

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 markdownlint-cli2 (0.17.2)

54-54: Emphasis used instead of a heading
null

(MD036, no-emphasis-as-heading)

- Fetch account updates from redis and process them using the `program_transformer` crate
- Fetch transaction updates from redis and processe them
- Fetch snapshots from redis and process them
- download token metedata json and store them in postgres db
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Fix typo in "metadata"

There's a spelling error in "metedata".

-- download token metedata json and store them in postgres db
+- download token metadata json and store them in postgres db
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
- download token metedata json and store them in postgres db
- download token metadata json and store them in postgres db

Comment on lines +17 to +19
lazy_static::lazy_static! {
static ref REGISTRY: Registry = Registry::new();

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Consider switching from lazy_static to once_cell

The rest of the codebase already relies on Once from std; adopting
once_cell::sync::Lazy would remove an extra dependency and give the same
semantics with a lighter macro-free API.

Comment on lines +111 to +140
pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
static REGISTER: Once = Once::new();
REGISTER.call_once(|| {
macro_rules! register {
($collector:ident) => {
REGISTRY
.register(Box::new($collector.clone()))
.expect("collector can't be registered");
};
}

register!(VERSION_INFO_METRIC);
register!(REDIS_STREAM_LENGTH);
register!(REDIS_XADD_STATUS_COUNT);
register!(REDIS_XREAD_COUNT);
register!(REDIS_XACK_COUNT);
register!(PGPOOL_CONNECTIONS);
register!(PROGRAM_TRANSFORMER_TASK_STATUS_COUNT);
register!(INGEST_JOB_TIME);
register!(DOWNLOAD_METADATA_FETCHED_COUNT);
register!(INGEST_TASKS);
register!(ACK_TASKS);
register!(GRPC_TASKS);
register!(BUBBLEGUM_TREE_TOTAL_LEAVES);
register!(BUBBLEGUM_TREE_INCORRECT_PROOFS);
register!(BUBBLEGUM_TREE_NOT_FOUND_PROOFS);
register!(BUBBLEGUM_TREE_CORRECT_PROOFS);
register!(BUBBLEGUM_TREE_CORRUPT_PROOFS);
register!(DOWNLOAD_METADATA_PUBLISH_TIME);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Register default process & build-info collectors for richer telemetry

You manually register every custom collector but miss the standard
prometheus::process_collector and prometheus::build_info collectors that
expose CPU, memory and Cargo metadata. Adding them costs nothing and improves
out-of-the-box dashboards:

register!(prometheus::process_collector::ProcessCollector::new());
register!(prometheus::build_info::BuildInfoCollector::new());

(They can be conditionally compiled to avoid duplication in other binaries.)

Comment on lines +60 to +64
static ref DOWNLOAD_METADATA_FETCHED_COUNT: IntGaugeVec = IntGaugeVec::new(
Opts::new("download_metadata_fetched_count", "Status of download metadata task"),
&["status"]
).unwrap();

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use IntCounterVec for monotonically-increasing “*_count” metric

DOWNLOAD_METADATA_FETCHED_COUNT is defined as an IntGaugeVec, yet in
download_metadata_json_task_status_count_inc (lines 269-273) you only ever
increment it.
According to Prometheus best-practices, counters must be used for values that
should only go up. Gauges are intended for values that may also go down or be
set arbitrarily, and mixing semantics hurts alerting & rate calculations.

-    static ref DOWNLOAD_METADATA_FETCHED_COUNT: IntGaugeVec = IntGaugeVec::new(
-        Opts::new("download_metadata_fetched_count", "Status of download metadata task"),
-        &["status"]
-    ).unwrap();
+    static ref DOWNLOAD_METADATA_FETCHED_COUNT: IntCounterVec = IntCounterVec::new(
+        Opts::new("download_metadata_fetched_count", "Status of download metadata task"),
+        &["status"]
+    ).unwrap();

No other code changes are needed because both inc() and with_label_values
are available on IntCounterVec.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
static ref DOWNLOAD_METADATA_FETCHED_COUNT: IntGaugeVec = IntGaugeVec::new(
Opts::new("download_metadata_fetched_count", "Status of download metadata task"),
&["status"]
).unwrap();
static ref DOWNLOAD_METADATA_FETCHED_COUNT: IntCounterVec = IntCounterVec::new(
Opts::new("download_metadata_fetched_count", "Status of download metadata task"),
&["status"]
).unwrap();

Comment on lines +154 to +172
let make_service = make_service_fn(move |_: &AddrStream| async move {
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| async move {
let response = match req.uri().path() {
"/metrics" => metrics_handler(),
_ => not_found_handler(),
};
Ok::<_, hyper::Error>(response)
}))
});

let server = Server::try_bind(&address)?.serve(make_service);
info!("prometheus server started: http://{address:?}/metrics");

tokio::spawn(async move {
if let Err(error) = server.await {
error!("prometheus server failed: {error:?}");
}
});

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Return a JoinHandle (or await) instead of fire-and-forget spawning

run_server starts the Hyper server with tokio::spawn and immediately
returns Ok(()). Callers therefore get no handle to await, cancel or listen
for shutdown, and unit/integration tests cannot deterministically wait for the
server to finish initialising.

Consider returning the JoinHandle<()> (or impl Future) so that the caller
can decide whether to detach or await:

-pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
+pub fn run_server(address: SocketAddr) -> anyhow::Result<tokio::task::JoinHandle<()>> {-    tokio::spawn(async move {
+    let handle = tokio::spawn(async move {
         if let Err(error) = server.await {
             error!("prometheus server failed: {error:?}");
         }
     });
-    Ok(())
+    Ok(handle)
}

That keeps the convenience of background execution while giving downstream code
proper lifecycle control.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let make_service = make_service_fn(move |_: &AddrStream| async move {
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| async move {
let response = match req.uri().path() {
"/metrics" => metrics_handler(),
_ => not_found_handler(),
};
Ok::<_, hyper::Error>(response)
}))
});
let server = Server::try_bind(&address)?.serve(make_service);
info!("prometheus server started: http://{address:?}/metrics");
tokio::spawn(async move {
if let Err(error) = server.await {
error!("prometheus server failed: {error:?}");
}
});
pub fn run_server(address: SocketAddr) -> anyhow::Result<tokio::task::JoinHandle<()>> {
let make_service = make_service_fn(move |_: &AddrStream| async move {
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| async move {
let response = match req.uri().path() {
"/metrics" => metrics_handler(),
_ => not_found_handler(),
};
Ok::<_, hyper::Error>(response)
}))
});
let server = Server::try_bind(&address)?.serve(make_service);
info!("prometheus server started: http://{address:?}/metrics");
let handle = tokio::spawn(async move {
if let Err(error) = server.await {
error!("prometheus server failed: {error:?}");
}
});
Ok(handle)
}

Comment on lines +176 to +187
fn metrics_handler() -> Response<Body> {
let metrics = TextEncoder::new()
.encode_to_string(&REGISTRY.gather())
.unwrap_or_else(|error| {
error!("could not encode custom metrics: {}", error);
String::new()
});
Response::builder()
.header("content-type", "text/plain")
.body(Body::from(metrics))
.unwrap()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Reuse a single TextEncoder to avoid tiny per-request allocations

metrics_handler creates a new TextEncoder on every scrape even though the
type is stateless. You can store one in a lazy_static or once_cell and
reuse it:

-fn metrics_handler() -> Response<Body> {
-    let metrics = TextEncoder::new()
+lazy_static::lazy_static! {
+    static ref TEXT_ENCODER: prometheus::TextEncoder = prometheus::TextEncoder::new();
+}
+
+fn metrics_handler() -> Response<Body> {
+    let metrics = TEXT_ENCODER
         .encode_to_string(&REGISTRY.gather())

The saving is small but free and eliminates needless allocations on every
Prometheus scrape.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn metrics_handler() -> Response<Body> {
let metrics = TextEncoder::new()
.encode_to_string(&REGISTRY.gather())
.unwrap_or_else(|error| {
error!("could not encode custom metrics: {}", error);
String::new()
});
Response::builder()
.header("content-type", "text/plain")
.body(Body::from(metrics))
.unwrap()
}
// Add at the top of the file (or before `metrics_handler`)
lazy_static::lazy_static! {
static ref TEXT_ENCODER: prometheus::TextEncoder = prometheus::TextEncoder::new();
}
fn metrics_handler() -> Response<Body> {
let metrics = TEXT_ENCODER
.encode_to_string(&REGISTRY.gather())
.unwrap_or_else(|error| {
error!("could not encode custom metrics: {}", error);
String::new()
});
Response::builder()
.header("content-type", "text/plain")
.body(Body::from(metrics))
.unwrap()
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Proposal: gRPC based indexer

5 participants