Skip to content

Conversation

nischitpra
Copy link
Collaborator

@nischitpra nischitpra commented Sep 11, 2025

Summary by CodeRabbit

  • New Features
    • Added a committer CLI command to process block data end-to-end: discover S3 parquet files, stream and validate blocks, and publish to Kafka.
  • Configuration
    • Enabled .env and environment-variable-driven configuration for ClickHouse, Kafka, S3, and RPC with sensible defaults.
  • Documentation
    • Added committer README with usage, config, S3 file conventions, and processing flow.
  • Chores
    • Added dependencies to support environment variable parsing and .env loading.

Copy link

coderabbitai bot commented Sep 11, 2025

Walkthrough

Adds a new "committer" Cobra CLI command and a committer package that reads block Parquet files from S3, validates sequencing with ClickHouse, and publishes blocks to Kafka; adds env-driven configuration (.env + env parsing) and supporting dependencies; updates a Kafka message struct pointer type.

Changes

Cohort / File(s) Summary
CLI: committer command
cmd/committer.go, cmd/root.go
Adds new committer Cobra subcommand; implements RunCommitter to init RPC, obtain chain ID, call committer.Init and committer.Commit; registers command in root.
Committer implementation & docs
internal/committer/committer.go, internal/committer/fetchLatest.go, internal/committer/README.md
New committer package: types BlockRange, ParquetBlockData; APIs Init, Commit, Close; S3 parquet discovery/download, streaming Parquet → internal BlockData parsing, sequencing validation, batched Kafka publish, download/processing concurrency via channel; FetchLatest polling helper; README documents flow, config, file/schema expectations.
Configuration: env + .env loading
configs/config.go
Adds many COMMITTER_/STAGING_ env-tagged fields (ClickHouse, Kafka, S3, RPC concurrency), uses godotenv.Load and env.Parse to populate global config, preserves Viper unmarshalling.
Dependencies
go.mod
Adds github.com/caarlos0/env and github.com/joho/godotenv dependencies.
Kafka publisher struct change
internal/storage/kafka_publisher.go
Changes PublishableMessageBlockData.BlockData from common.BlockData to *common.BlockData and updates construction sites to pass pointer.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor User
  participant CLI as CLI (committer)
  participant RPC as RPC Client
  participant Committer as Committer
  participant CH as ClickHouse
  participant S3 as S3
  participant Kafka as Kafka

  User->>CLI: run "committer"
  CLI->>RPC: Initialize()
  CLI->>RPC: GetChainID()
  CLI->>Committer: Init(chainId, rpc)
  CLI->>Committer: Commit(chainId)

  Note over Committer: Process S3 parquet ranges
  Committer->>CH: GetMaxCommittedBlock()
  Committer->>S3: List parquet files (chain prefix)
  Committer->>Committer: Filter & sort ranges
  loop each range
    Committer->>S3: Download parquet
    Committer->>Committer: Parse stream rows
    Committer->>Kafka: Publish batches
  end

  Note over Committer,RPC: Fetch latest via RPC loop
  loop poll
    Committer->>RPC: GetLatestBlockNumber()
    alt latest > nextCommit
      Committer->>RPC: Fetch blocks in parallel batches
      Committer->>Kafka: Publish batches
    else
      Committer-->>Committer: wait & retry
    end
  end

  CLI-->>User: exit
Loading
sequenceDiagram
  autonumber
  participant Committer
  participant RPC as RPC Client

  loop FetchLatest
    Committer->>RPC: GetLatestBlockNumber(ctx)
    alt error
      Committer-->>Committer: return error
    else latest > nextCommit
      Committer->>Committer: schedule parallel batch fetch & publish
    else
      Committer-->>Committer: continue polling
    end
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Pre-merge checks (1 passed, 1 warning, 1 inconclusive)

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Title Check ❓ Inconclusive The current title "committer redo" references the committer work in this changeset but is vague and non-descriptive: "redo" does not convey the primary changes (new committer package/CLI, S3→Kafka processing, and added config fields). Because it lacks sufficient specificity for a reviewer to quickly understand the main change, the check is inconclusive. Rename the PR to a concise, specific sentence summarizing the primary change, for example "Add committer: S3-to-Kafka processing, CLI command, and config fields" or "Introduce committer module and CLI to publish S3 Parquet blocks to Kafka".
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch np/committer_redo

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

🧹 Nitpick comments (10)
go.mod (1)

55-55: Update github.com/caarlos0/env to v11 and update imports

Latest major is v11 — import path github.com/caarlos0/env/v11. Replace the go.mod require (currently "github.com/caarlos0/env v3.5.0+incompatible" at go.mod:55) with "github.com/caarlos0/env/v11" and update all code imports to github.com/caarlos0/env/v11.

internal/committer/README.md (1)

126-129: Add a language to the fenced code block (MD040).

Use a neutral lexer.

-```
+```text
 chain_${chainId}/year=2024/blocks_1000_2000.parquet

</blockquote></details>
<details>
<summary>configs/config.go (1)</summary><blockquote>

`10-11`: **Avoid mixing two env-loading paradigms unless necessary.**

You’re enabling viper.AutomaticEnv and also parsing with caarlos0/env plus loading .env via godotenv. Consider simplifying to one approach (e.g., rely on viper + replacer for nested config, and keep caarlos0/env only for the explicit top-level env-only fields), and document the precedence.

</blockquote></details>
<details>
<summary>cmd/committer.go (1)</summary><blockquote>

`14-15`: **Fix typo in command description.**

There's a typo in the Long description.


```diff
-	Long:  "published data from s3 to kafka. if block is not found in s3, it will panic",
+	Long:  "publishes data from S3 to Kafka. If a block is not found in S3, it will panic",
internal/committer/committer.go (6)

296-296: Make batch size configurable.

The batch size is hardcoded to 500. Consider making it configurable for different environments and performance requirements.

-	batchSize := 500 // Publish 500 blocks at a time
+	batchSize := config.Cfg.CommitterKafkaBatchSize
+	if batchSize == 0 {
+		batchSize = 500 // Default batch size
+	}

517-522: Use io.Copy instead of ReadFrom for better error handling.

Using io.Copy provides better control over the copy operation and is more idiomatic.

 	log.Debug().Str("file", blockRange.S3Key).Msg("Starting file stream to disk")
-	_, err = file.ReadFrom(result.Body)
+	_, err = io.Copy(file, result.Body)
 	if err != nil {
 		os.Remove(localPath) // Clean up on error
 		return fmt.Errorf("failed to write file: %w", err)
 	}

716-898: Extract fetchLatest logic into smaller functions for better maintainability.

The fetchLatest function is too complex (182 lines). Consider breaking it down into smaller, focused functions.

The function handles multiple responsibilities:

  1. Polling for latest blocks
  2. Calculating fetch ranges
  3. Parallel batch fetching
  4. Block validation
  5. Kafka publishing

Would you like me to help refactor this into smaller, more maintainable functions?


730-731: Make RPC batch configuration consistent with global settings.

The RPC batch size is hardcoded while parallel calls are configurable. Both should follow the same pattern.

 	rpcNumParallelCalls := config.Cfg.CommitterRPCNumParallelCalls
-	rpcBatchSize := int64(50)
+	rpcBatchSize := config.Cfg.CommitterRPCBatchSize
+	if rpcBatchSize == 0 {
+		rpcBatchSize = 50 // Default batch size
+	}

356-365: Use defer for mutex unlock to ensure cleanup.

Using defer ensures the mutex is unlocked even if a panic occurs.

 	// Clear block data from memory to free up space
 	mu.Lock()
+	defer mu.Unlock()
 	blockDataCount := len(blockRange.BlockData)
 	blockRange.BlockData = nil
-	mu.Unlock()

204-228: Add context support for database operations.

Database queries should use context with timeout for better control.

 func getMaxBlockNumberFromClickHouse(chainId *big.Int) (*big.Int, error) {
+	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+	defer cancel()
+	
 	// Use toString() to force ClickHouse to return a string instead of UInt256
 	query := fmt.Sprintf("SELECT toString(max(block_number)) FROM blocks WHERE chain_id = %d", chainId.Uint64())
-	rows, err := clickhouseConn.Query(context.Background(), query)
+	rows, err := clickhouseConn.Query(ctx, query)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between bd3eca1 and b8be8e4.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (8)
  • cmd/committer.go (1 hunks)
  • cmd/root.go (1 hunks)
  • configs/config.go (3 hunks)
  • go.mod (2 hunks)
  • internal/committer/README.md (1 hunks)
  • internal/committer/committer.go (1 hunks)
  • internal/committer/fetchLatest.go (1 hunks)
  • internal/storage/kafka_publisher.go (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
cmd/committer.go (2)
internal/rpc/rpc.go (1)
  • Initialize (68-98)
internal/committer/committer.go (2)
  • Init (63-71)
  • Commit (132-201)
internal/committer/fetchLatest.go (1)
internal/rpc/rpc.go (1)
  • IRPCClient (42-55)
internal/storage/kafka_publisher.go (1)
internal/common/block.go (1)
  • BlockData (61-66)
internal/committer/committer.go (4)
internal/common/block.go (2)
  • BlockData (61-66)
  • Block (8-33)
internal/rpc/rpc.go (3)
  • IRPCClient (42-55)
  • Client (57-66)
  • GetFullBlockResult (18-22)
internal/storage/kafka_publisher.go (2)
  • KafkaPublisher (20-23)
  • NewKafkaPublisher (60-111)
configs/config.go (3)
  • Cfg (294-294)
  • Config (264-292)
  • KafkaConfig (152-157)
🪛 ast-grep (0.38.6)
internal/committer/committer.go

[warning] 79-79: MinVersionis missing from this TLS configuration. By default, TLS 1.2 is currently used as the minimum when acting as a client, and TLS 1.0 when acting as a server. General purpose web applications should default to TLS 1.3 with all other protocols disabled. Only where it is known that a web server must support legacy clients with unsupported an insecure browsers (such as Internet Explorer 10), it may be necessary to enable TLS 1.0 to provide support. AddMinVersion: tls.VersionTLS13' to the TLS configuration to bump the minimum version to TLS 1.3.
Context: tls.Config{}
Note: [CWE-327]: Use of a Broken or Risky Cryptographic Algorithm [OWASP A03:2017]: Sensitive Data Exposure [OWASP A02:2021]: Cryptographic Failures [REFERENCES]
https://owasp.org/Top10/A02_2021-Cryptographic_Failures

(missing-ssl-minversion-go)

🪛 markdownlint-cli2 (0.17.2)
internal/committer/README.md

126-126: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test
🔇 Additional comments (5)
go.mod (1)

55-55: Direct deps should not be marked indirect — run go mod tidy

rg returned no files in the sandbox so I couldn't verify imports. If these packages are direct imports (e.g., configs/config.go), run:
go mod tidy && git diff -- go.mod go.sum
or remove the // indirect markers at go.mod lines 55 and 101 (github.com/caarlos0/env v3.5.0+incompatible and github.com/joho/godotenv v1.5.1).

internal/committer/README.md (1)

175-181: Verify “semaphores” claim matches implementation.

Doc says semaphores limit concurrency; ensure the code actually uses them (or reword).

configs/config.go (1)

276-292: LGTM on new env-backed fields and defaults.

Names are clear; sensible defaults for TLS and parallelism.

cmd/root.go (1)

432-432: LGTM! Command registration is properly integrated.

The new committer subcommand is correctly registered in the command hierarchy.

internal/storage/kafka_publisher.go (1)

37-39: Verify that consumers handle the pointer type change.

Changing common.BlockData → *common.BlockData is a breaking schema change for Kafka consumers. Repo search only found producer-side usage in internal/storage/kafka_publisher.go (struct at lines 37–39; createBlockDataMessage at 260–262) — no internal consumers detected. Verify all external Kafka consumers that deserialize PublishableMessageBlockData accept the pointer (or update them / provide a backward-compatible migration).

Comment on lines +19 to +29
func RunCommitter(cmd *cobra.Command, args []string) {
fmt.Println("running committer")
rpc, err := rpc.Initialize()
if err != nil {
log.Fatal().Err(err).Msg("Failed to initialize RPC")
}
chainId := rpc.GetChainID()

committer.Init(chainId, rpc)
committer.Commit(chainId)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add proper error handling and resource cleanup.

The function should properly handle errors and clean up resources when the committer finishes or encounters an error.

 func RunCommitter(cmd *cobra.Command, args []string) {
 	fmt.Println("running committer")
 	rpc, err := rpc.Initialize()
 	if err != nil {
 		log.Fatal().Err(err).Msg("Failed to initialize RPC")
 	}
+	defer rpc.Close()
+	
 	chainId := rpc.GetChainID()
 
 	committer.Init(chainId, rpc)
-	committer.Commit(chainId)
+	if err := committer.Commit(chainId); err != nil {
+		committer.Close()
+		log.Fatal().Err(err).Msg("Committer failed")
+	}
+	if err := committer.Close(); err != nil {
+		log.Error().Err(err).Msg("Failed to close committer cleanly")
+	}
 }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In cmd/committer.go around lines 19 to 29, the function lacks proper error
handling and resource cleanup: ensure rpc is closed on exit and handle errors
from Init and Commit. After rpc.Initialize(), if err != nil keep the current
fatal log; otherwise defer a safe rpc.Close() (or Close context) to release
resources. Check and handle errors returned by committer.Init and
committer.Commit (log and exit non‑zero or return the error) instead of ignoring
them, and ensure cleanup runs on both success and error paths.

Comment on lines +297 to +305
err := godotenv.Load()
if err != nil {
log.Info().Msg("No .env file found")
}
err = env.Parse(&Cfg)
if err != nil {
panic(err)
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Set precedence to ENV > file and avoid panic; parse env last and return errors.

Current flow parses env first, then unmarshals file (file overrides env) and panics on parse errors. Common precedence is ENV > file, and this function already returns errors.

@@
-	err := godotenv.Load()
-	if err != nil {
-		log.Info().Msg("No .env file found")
-	}
-	err = env.Parse(&Cfg)
-	if err != nil {
-		panic(err)
-	}
+	err := godotenv.Load()
+	if err != nil {
+		// .env is optional; keep noise low but retain diagnostics
+		log.Debug().Err(err).Msg("Skipping .env load")
+	}
@@
-	err = viper.Unmarshal(&Cfg)
+	err = viper.Unmarshal(&Cfg)
 	if err != nil {
 		return fmt.Errorf("error unmarshalling config: %v", err)
 	}
+
+	// Parse env-tagged fields last so ENV overrides file values.
+	if err := env.Parse(&Cfg); err != nil {
+		return fmt.Errorf("error parsing env into config: %w", err)
+	}

Also applies to: 332-336

🤖 Prompt for AI Agents
In configs/config.go around lines 297-305 (and similarly 332-336), the current
sequence calls godotenv.Load(), then env.Parse(&Cfg) so file values override
environment and the code panics on parse errors; change the flow so environment
variables take precedence by parsing env last and do not panic—load the file
first into a temporary struct or map, then call env.Parse(&Cfg) to overwrite
with ENV values, propagate and return any parse error instead of panicking, and
adjust both affected blocks accordingly.

Comment on lines +64 to +71
rpcClient = rpc
tempDir = filepath.Join(os.TempDir(), "committer", fmt.Sprintf("chain_%d", chainId.Uint64()))
downloadComplete = make(chan *BlockRange, config.Cfg.StagingS3MaxParallelFileDownload)

initClickHouse()
initS3()
initKafka()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate initialization parameters.

The Init function should validate that required parameters are not nil.

 func Init(chainId *big.Int, rpc rpc.IRPCClient) {
+	if chainId == nil {
+		log.Fatal().Msg("Chain ID cannot be nil")
+	}
+	if rpc == nil {
+		log.Fatal().Msg("RPC client cannot be nil")
+	}
 	rpcClient = rpc
 	tempDir = filepath.Join(os.TempDir(), "committer", fmt.Sprintf("chain_%d", chainId.Uint64()))
 	downloadComplete = make(chan *BlockRange, config.Cfg.StagingS3MaxParallelFileDownload)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
rpcClient = rpc
tempDir = filepath.Join(os.TempDir(), "committer", fmt.Sprintf("chain_%d", chainId.Uint64()))
downloadComplete = make(chan *BlockRange, config.Cfg.StagingS3MaxParallelFileDownload)
initClickHouse()
initS3()
initKafka()
}
if chainId == nil {
log.Fatal().Msg("Chain ID cannot be nil")
}
if rpc == nil {
log.Fatal().Msg("RPC client cannot be nil")
}
rpcClient = rpc
tempDir = filepath.Join(os.TempDir(), "committer", fmt.Sprintf("chain_%d", chainId.Uint64()))
downloadComplete = make(chan *BlockRange, config.Cfg.StagingS3MaxParallelFileDownload)
initClickHouse()
initS3()
initKafka()
}
🤖 Prompt for AI Agents
In internal/committer/committer.go around lines 64 to 71, the Init function
assigns rpc and uses chainId and config without validating them; add explicit
nil/zero-value checks for the inputs before using them (e.g., ensure rpc is not
nil, chainId is not nil before calling Uint64, and relevant config/Cfg fields
are initialized), and return a descriptive error if any required parameter is
missing/invalid instead of proceeding to set rpcClient, tempDir, or creating
channels and initializing subsystems.

Comment on lines +76 to +83
Addr: []string{fmt.Sprintf("%s:%d", config.Cfg.CommitterClickhouseHost, config.Cfg.CommitterClickhousePort)},
Protocol: clickhouse.Native,
TLS: func() *tls.Config {
if config.Cfg.CommitterClickhouseEnableTLS {
return &tls.Config{}
}
return nil
}(),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Configure TLS minimum version for security.

The TLS configuration should specify a minimum version for security best practices.

 	TLS: func() *tls.Config {
 		if config.Cfg.CommitterClickhouseEnableTLS {
-			return &tls.Config{}
+			return &tls.Config{
+				MinVersion: tls.VersionTLS12,
+			}
 		}
 		return nil
 	}(),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Addr: []string{fmt.Sprintf("%s:%d", config.Cfg.CommitterClickhouseHost, config.Cfg.CommitterClickhousePort)},
Protocol: clickhouse.Native,
TLS: func() *tls.Config {
if config.Cfg.CommitterClickhouseEnableTLS {
return &tls.Config{}
}
return nil
}(),
Addr: []string{fmt.Sprintf("%s:%d", config.Cfg.CommitterClickhouseHost, config.Cfg.CommitterClickhousePort)},
Protocol: clickhouse.Native,
TLS: func() *tls.Config {
if config.Cfg.CommitterClickhouseEnableTLS {
return &tls.Config{
MinVersion: tls.VersionTLS12,
}
}
return nil
}(),
🧰 Tools
🪛 ast-grep (0.38.6)

[warning] 79-79: MinVersionis missing from this TLS configuration. By default, TLS 1.2 is currently used as the minimum when acting as a client, and TLS 1.0 when acting as a server. General purpose web applications should default to TLS 1.3 with all other protocols disabled. Only where it is known that a web server must support legacy clients with unsupported an insecure browsers (such as Internet Explorer 10), it may be necessary to enable TLS 1.0 to provide support. AddMinVersion: tls.VersionTLS13' to the TLS configuration to bump the minimum version to TLS 1.3.
Context: tls.Config{}
Note: [CWE-327]: Use of a Broken or Risky Cryptographic Algorithm [OWASP A03:2017]: Sensitive Data Exposure [OWASP A02:2021]: Cryptographic Failures [REFERENCES]
https://owasp.org/Top10/A02_2021-Cryptographic_Failures

(missing-ssl-minversion-go)

🤖 Prompt for AI Agents
In internal/committer/committer.go around lines 76 to 83, the TLS config
currently returns an empty tls.Config when TLS is enabled; update it to set a
secure minimum TLS version (e.g. tls.VersionTLS12) by returning
&tls.Config{MinVersion: tls.VersionTLS12} when
config.Cfg.CommitterClickhouseEnableTLS is true so the ClickHouse client
enforces that minimum version.

Comment on lines +113 to +114
o.BaseEndpoint = aws.String("https://s3.us-west-2.amazonaws.com")
})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove hardcoded S3 endpoint to support multiple regions.

The S3 endpoint is hardcoded to us-west-2, which prevents using buckets in other regions. The endpoint should be dynamic based on the configured region.

 	s3Client = s3.NewFromConfig(awsCfg, func(o *s3.Options) {
-		o.BaseEndpoint = aws.String("https://s3.us-west-2.amazonaws.com")
+		// Let the SDK determine the endpoint based on the region
 	})
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
o.BaseEndpoint = aws.String("https://s3.us-west-2.amazonaws.com")
})
s3Client = s3.NewFromConfig(awsCfg, func(o *s3.Options) {
// Let the SDK determine the endpoint based on the region
})
🤖 Prompt for AI Agents
In internal/committer/committer.go around lines 113-114, remove the hardcoded S3
endpoint ("https://s3.us-west-2.amazonaws.com") so region-specific buckets work;
either delete the BaseEndpoint override to let the AWS SDK derive the correct
endpoint from the configured region, or construct the endpoint dynamically from
the configured region (e.g., use the existing region/config variable to build
"https://s3.<region>.amazonaws.com") and assign that to BaseEndpoint; update any
related variable names or tests to use the configured region.

Comment on lines +178 to +180
if err := downloadFile(&blockRange); err != nil {
log.Panic().Err(err).Str("file", blockRange.S3Key).Msg("Failed to download file")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid using panic for recoverable errors.

Using panic for download failures is too severe. The function should return an error and let the caller decide how to handle it.

 		if err := downloadFile(&blockRange); err != nil {
-			log.Panic().Err(err).Str("file", blockRange.S3Key).Msg("Failed to download file")
+			log.Error().Err(err).Str("file", blockRange.S3Key).Msg("Failed to download file")
+			return fmt.Errorf("failed to download file %s: %w", blockRange.S3Key, err)
 		}

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In internal/committer/committer.go around lines 178 to 180, the code uses
log.Panic() when downloadFile fails which is too severe; change the function to
return an error instead of panicking: replace the log.Panic() call with a logged
error (e.g., log.Error().Err(err).Str("file", blockRange.S3Key).Msg("Failed to
download file")) and return the error (wrap with context if desired). Update the
current function signature to return error if it doesn't already and propagate
that error to callers (adjust their signatures/handling to either return the
error up the stack or handle it appropriately). Ensure all call sites are
updated to check and handle the returned error instead of relying on a panic.

Comment on lines +271 to +273
Msg("Found block number greater than expected - missing block in sequence")
log.Panic().Msg("Block sequence mismatch")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace panic with proper error handling for sequence mismatches.

Panicking on sequence mismatch is too harsh. Consider returning an error or implementing a recovery mechanism.

 			log.Error().
 				Str("file", blockRange.S3Key).
 				Uint64("block_number", blockData.Block.Number.Uint64()).
 				Str("next_commit_block", nextCommitBlockNumber.String()).
 				Msg("Found block number greater than expected - missing block in sequence")
-			log.Panic().Msg("Block sequence mismatch")
+			return fmt.Errorf("block sequence mismatch: expected %s, got %d", nextCommitBlockNumber.String(), blockData.Block.Number.Uint64())

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +848 to +849
log.Panic().Msg("Failed to fetch all required blocks")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consolidate panic usage for consistency.

The code uses panic in multiple places for different types of errors. Consider establishing a clear policy on when to panic vs return errors.

Critical errors that should panic:

  • Data integrity issues (missing blocks in sequence)
  • Failed Kafka publishes (data loss risk)

Errors that should be returned/retried:

  • Network failures (S3, RPC)
  • Temporary resource issues

Consider implementing a centralized error handling strategy with retry logic for transient failures.

Also applies to: 885-888

🤖 Prompt for AI Agents
In internal/committer/committer.go around lines 848 (and also inspect 885-888),
the reviewer flagged inconsistent use of panic; update this area to follow a
clear policy: only panic for unrecoverable data-integrity issues (e.g., missing
sequential blocks) and for failed Kafka publishes, and for all other failures
(network/S3/RPC/temporary resource problems) return an error or retry.
Concretely, replace the unconditional log.Panic() at line 848 with logic that
distinguishes the error type: if it’s a data-integrity error, call log.Panic()
with context; otherwise wrap and return the error to the caller (or invoke a
retry helper). Apply the same change pattern at 885-888, and factor shared
retry/centralized error-handling into a small helper used by these sites.

Comment on lines +10 to +21
func FetchLatest(chainId *big.Int, rpc rpc.IRPCClient) error {
for {
latestBlock, err := rpc.GetLatestBlockNumber(context.Background())
if err != nil {
return err
}
if latestBlock.Cmp(chainId) > 0 {
return nil
}
}
return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix logic error and infinite loop vulnerability.

The function has a logic error where it compares latestBlock with chainId instead of a block number. This comparison doesn't make semantic sense and will likely cause incorrect behavior. Additionally, the infinite loop lacks any delay between iterations and has no escape mechanism for graceful shutdown.

The function appears to be comparing a block number with a chain ID, which are different types of values. This needs to be fixed to compare with the appropriate block number parameter. Also, the infinite loop should have proper delay and cancellation support:

-func FetchLatest(chainId *big.Int, rpc rpc.IRPCClient) error {
+func FetchLatest(ctx context.Context, targetBlockNumber *big.Int, rpc rpc.IRPCClient) error {
 	for {
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		default:
+		}
+		
 		latestBlock, err := rpc.GetLatestBlockNumber(context.Background())
 		if err != nil {
 			return err
 		}
-		if latestBlock.Cmp(chainId) > 0 {
+		if latestBlock.Cmp(targetBlockNumber) >= 0 {
 			return nil
 		}
+		
+		// Add delay to avoid tight loop
+		time.Sleep(250 * time.Millisecond)
 	}
-	return nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func FetchLatest(chainId *big.Int, rpc rpc.IRPCClient) error {
for {
latestBlock, err := rpc.GetLatestBlockNumber(context.Background())
if err != nil {
return err
}
if latestBlock.Cmp(chainId) > 0 {
return nil
}
}
return nil
}
func FetchLatest(ctx context.Context, targetBlockNumber *big.Int, rpc rpc.IRPCClient) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
latestBlock, err := rpc.GetLatestBlockNumber(context.Background())
if err != nil {
return err
}
if latestBlock.Cmp(targetBlockNumber) >= 0 {
return nil
}
// Add delay to avoid tight loop
time.Sleep(250 * time.Millisecond)
}
}

Comment on lines +20 to +52
```go
package main

import (
"context"
"math/big"
"log"

"github.com/thirdweb-dev/indexer/internal/committer"
"github.com/thirdweb-dev/indexer/configs"
)

func main() {
// Load configuration
if err := configs.LoadConfig("config.yml"); err != nil {
log.Fatal("Failed to load config:", err)
}

// Create committer for chain ID 1 (Ethereum mainnet)
chainId := big.NewInt(1)
committer, err := committer.NewCommitterFromConfig(chainId)
if err != nil {
log.Fatal("Failed to create committer:", err)
}
defer committer.Close()

// Process blocks
ctx := context.Background()
if err := committer.ProcessBlocks(ctx); err != nil {
log.Fatal("Failed to process blocks:", err)
}
}
```
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Examples reference non-existent APIs; align with actual public surface.

README shows NewCommitterFromConfig/NewCommitter/ProcessBlocks, but the code exposes Init/Commit (and a CLI subcommand). Please update the examples to either:

  • demonstrate the CLI: insight committer with env-config, or
  • show programmatic usage via committer.Init(chainID, rpc) then committer.Commit(chainID) (and Close()), matching cmd/committer.go.

I can draft a corrected snippet once you confirm the intended public API.

Also applies to: 56-104

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
configs/config.go (1)

372-380: Bug: Orchestrator chainBasedConfig is assigned to Main.Clickhouse.

This incorrectly writes orchestrator JSON into Main.Clickhouse. Should target Storage.Orchestrator.Clickhouse.

-	if Cfg.Storage.Main.Clickhouse != nil {
-		Cfg.Storage.Main.Clickhouse.ChainBasedConfig = orchestratorChainConfig
-	}
+	if Cfg.Storage.Orchestrator.Clickhouse != nil {
+		Cfg.Storage.Orchestrator.Clickhouse.ChainBasedConfig = orchestratorChainConfig
+	}
♻️ Duplicate comments (1)
configs/config.go (1)

298-306: ENV should override file; remove panic; parse env last.

Current flow loads .env, parses ENV into Cfg, then unmarshals file over it (file wins) and panics on parse error. Prefer precedence ENV > file and return errors instead of panicking. Also reduce noise when .env is absent.

@@
-	err := godotenv.Load()
-	if err != nil {
-		log.Info().Msg("No .env file found")
-	}
-	err = env.Parse(&Cfg)
-	if err != nil {
-		panic(err)
-	}
+	err := godotenv.Load()
+	if err != nil {
+		// .env is optional; keep logs quiet in prod
+		log.Debug().Err(err).Msg("Skipping .env load")
+	}
@@
-	err = viper.Unmarshal(&Cfg)
-	if err != nil {
-		return fmt.Errorf("error unmarshalling config: %v", err)
-	}
+	if err := viper.Unmarshal(&Cfg); err != nil {
+		return fmt.Errorf("error unmarshalling config: %w", err)
+	}
+
+	// Parse env-tagged fields last so ENV overrides file values.
+	if err := env.Parse(&Cfg); err != nil {
+		return fmt.Errorf("error parsing env into config: %w", err)
+	}

Also applies to: 333-336

🧹 Nitpick comments (2)
configs/config.go (2)

10-11: godotenv/env imports OK; keep .env usage dev-friendly.

Fine to include, but ensure missing .env is debug-level (see prior diff) to avoid prod noise.


276-293: Flattened committer ENV fields: consider scoping and consistency.

Top-level env-only fields mix concerns with existing Storage/Kafka/ClickHouse configs. Consider nesting under Committer (e.g., Committer.DB/Kafka/S3) for cohesion, align bool semantics (EnableTLS vs DisableTLS in other structs), and use consistent types (prefer int over int64 here unless >2B expected). Also ensure secrets are never logged.

Would you like a follow-up patch to group these under Committer and add redaction helpers for passwords?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between b8be8e4 and 6aa9911.

📒 Files selected for processing (1)
  • configs/config.go (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test
🔇 Additional comments (1)
configs/config.go (1)

327-336: Confirm Viper env overlay behavior with Unmarshal.

rg -nP 'viper\s*.\sBindEnv\s(' — no matches found. AutomaticEnv + viper.Unmarshal won't reliably override nested mapstructure keys; Unmarshal doesn't pick up AutomaticEnv unless keys are explicitly bound. You now use caarlos0/env (post-Unmarshal) which covers new env-tagged fields; for legacy keys that must be overridable by ENV either add explicit viper.BindEnv(...) for those keys or migrate them to caarlos0/env.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant