Skip to content
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions cmd/committer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package cmd

import (
"fmt"

"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"github.com/thirdweb-dev/indexer/internal/committer"
"github.com/thirdweb-dev/indexer/internal/rpc"
)

var committerCmd = &cobra.Command{
Use: "committer",
Short: "run committer",
Long: "published data from s3 to kafka. if block is not found in s3, it will panic",
Run: RunCommitter,
}

func RunCommitter(cmd *cobra.Command, args []string) {
fmt.Println("running committer")
rpc, err := rpc.Initialize()
if err != nil {
log.Fatal().Err(err).Msg("Failed to initialize RPC")
}
chainId := rpc.GetChainID()

committer.Init(chainId, rpc)
committer.Commit(chainId)
}
Comment on lines +19 to +29
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add proper error handling and resource cleanup.

The function should properly handle errors and clean up resources when the committer finishes or encounters an error.

 func RunCommitter(cmd *cobra.Command, args []string) {
 	fmt.Println("running committer")
 	rpc, err := rpc.Initialize()
 	if err != nil {
 		log.Fatal().Err(err).Msg("Failed to initialize RPC")
 	}
+	defer rpc.Close()
+	
 	chainId := rpc.GetChainID()
 
 	committer.Init(chainId, rpc)
-	committer.Commit(chainId)
+	if err := committer.Commit(chainId); err != nil {
+		committer.Close()
+		log.Fatal().Err(err).Msg("Committer failed")
+	}
+	if err := committer.Close(); err != nil {
+		log.Error().Err(err).Msg("Failed to close committer cleanly")
+	}
 }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In cmd/committer.go around lines 19 to 29, the function lacks proper error
handling and resource cleanup: ensure rpc is closed on exit and handle errors
from Init and Commit. After rpc.Initialize(), if err != nil keep the current
fatal log; otherwise defer a safe rpc.Close() (or Close context) to release
resources. Check and handle errors returned by committer.Init and
committer.Commit (log and exit non‑zero or return the error) instead of ignoring
them, and ensure cleanup runs on both success and error paths.

1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ func init() {

rootCmd.AddCommand(orchestratorCmd)
rootCmd.AddCommand(apiCmd)
rootCmd.AddCommand(committerCmd)
rootCmd.AddCommand(validateAndFixCmd)
rootCmd.AddCommand(validateCmd)
rootCmd.AddCommand(migrateValidationCmd)
Expand Down
30 changes: 29 additions & 1 deletion configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"
"time"

"github.com/caarlos0/env"
"github.com/joho/godotenv"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
)
Expand Down Expand Up @@ -270,11 +272,37 @@ type Config struct {
Publisher PublisherConfig `mapstructure:"publisher"`
Validation ValidationConfig `mapstructure:"validation"`
Migrator MigratorConfig `mapstructure:"migrator"`

CommitterClickhouseDatabase string `env:"COMMITTER_CLICKHOUSE_DATABASE"`
CommitterClickhouseHost string `env:"COMMITTER_CLICKHOUSE_HOST"`
CommitterClickhousePort int `env:"COMMITTER_CLICKHOUSE_PORT"`
CommitterClickhouseUsername string `env:"COMMITTER_CLICKHOUSE_USERNAME"`
CommitterClickhousePassword string `env:"COMMITTER_CLICKHOUSE_PASSWORD"`
CommitterClickhouseEnableTLS bool `env:"COMMITTER_CLICKHOUSE_ENABLE_TLS" envDefault:"true"`
CommitterKafkaBrokers string `env:"COMMITTER_KAFKA_BROKERS"`
CommitterKafkaUsername string `env:"COMMITTER_KAFKA_USERNAME"`
CommitterKafkaPassword string `env:"COMMITTER_KAFKA_PASSWORD"`
CommitterKafkaEnableTLS bool `env:"COMMITTER_KAFKA_ENABLE_TLS" envDefault:"true"`
StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"`
StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"`
StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"`
StagingS3SecretAccessKey string `env:"STAGING_S3_SECRET_ACCESS_KEY"`
StagingS3MaxParallelFileDownload int `env:"STAGING_S3_MAX_PARALLEL_FILE_DOWNLOAD" envDefault:"2"`
CommitterRPCNumParallelCalls int64 `env:"COMMITTER_RPC_NUM_PARALLEL_CALLS" envDefault:"10"`
}

var Cfg Config

func LoadConfig(cfgFile string) error {
err := godotenv.Load()
if err != nil {
log.Info().Msg("No .env file found")
}
err = env.Parse(&Cfg)
if err != nil {
panic(err)
}

Comment on lines +298 to +306
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Set precedence to ENV > file and avoid panic; parse env last and return errors.

Current flow parses env first, then unmarshals file (file overrides env) and panics on parse errors. Common precedence is ENV > file, and this function already returns errors.

@@
-	err := godotenv.Load()
-	if err != nil {
-		log.Info().Msg("No .env file found")
-	}
-	err = env.Parse(&Cfg)
-	if err != nil {
-		panic(err)
-	}
+	err := godotenv.Load()
+	if err != nil {
+		// .env is optional; keep noise low but retain diagnostics
+		log.Debug().Err(err).Msg("Skipping .env load")
+	}
@@
-	err = viper.Unmarshal(&Cfg)
+	err = viper.Unmarshal(&Cfg)
 	if err != nil {
 		return fmt.Errorf("error unmarshalling config: %v", err)
 	}
+
+	// Parse env-tagged fields last so ENV overrides file values.
+	if err := env.Parse(&Cfg); err != nil {
+		return fmt.Errorf("error parsing env into config: %w", err)
+	}

Also applies to: 332-336

🤖 Prompt for AI Agents
In configs/config.go around lines 297-305 (and similarly 332-336), the current
sequence calls godotenv.Load(), then env.Parse(&Cfg) so file values override
environment and the code panics on parse errors; change the flow so environment
variables take precedence by parsing env last and do not panic—load the file
first into a temporary struct or map, then call env.Parse(&Cfg) to overwrite
with ENV values, propagate and return any parse error instead of panicking, and
adjust both affected blocks accordingly.

if cfgFile != "" {
viper.SetConfigFile(cfgFile)
if err := viper.ReadInConfig(); err != nil {
Expand All @@ -301,7 +329,7 @@ func LoadConfig(cfgFile string) error {

viper.AutomaticEnv()

err := viper.Unmarshal(&Cfg)
err = viper.Unmarshal(&Cfg)
if err != nil {
return fmt.Errorf("error unmarshalling config: %v", err)
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ require (
github.com/bits-and-blooms/bitset v1.20.0 // indirect
github.com/bytedance/sonic v1.12.6 // indirect
github.com/bytedance/sonic/loader v0.2.1 // indirect
github.com/caarlos0/env v3.5.0+incompatible // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
Expand Down Expand Up @@ -97,6 +98,7 @@ require (
github.com/gorilla/websocket v1.4.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.18.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ github.com/bytedance/sonic v1.12.6/go.mod h1:B8Gt/XvtZ3Fqj+iSKMypzymZxw/FVwgIGKz
github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
github.com/bytedance/sonic/loader v0.2.1 h1:1GgorWTqf12TA8mma4DDSbaQigE2wOgQo7iCjjJv3+E=
github.com/bytedance/sonic/loader v0.2.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
github.com/caarlos0/env v3.5.0+incompatible h1:Yy0UN8o9Wtr/jGHZDpCBLpNrzcFLLM2yixi/rBrKyJs=
github.com/caarlos0/env v3.5.0+incompatible/go.mod h1:tdCsowwCzMLdkqRYDlHpZCp2UooDD3MspDBjZ2AD02Y=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
Expand Down Expand Up @@ -212,6 +214,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
Expand Down
180 changes: 180 additions & 0 deletions internal/committer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# Committer Package

This package implements a committer that processes block data from S3 parquet files and publishes them to Kafka. It follows the requirements specified in the original comments.

## Features

- **ClickHouse Integration**: Gets the maximum block number from ClickHouse for the chain
- **S3 File Discovery**: Lists parquet files from S3 with chain-specific prefixes
- **Block Range Parsing**: Extracts start and end block numbers from S3 filenames
- **File Filtering**: Skips files where end block is less than max block number from ClickHouse
- **Sequential Processing**: Processes files in ascending order by start block number
- **Memory-Efficient Streaming**: Streams parquet files row-by-row to minimize memory usage
- **Kafka Publishing**: Publishes processed block data to Kafka
- **Error Handling**: Comprehensive error handling with detailed logging

## Usage

### Basic Usage

```go
package main

import (
"context"
"math/big"
"log"

"github.com/thirdweb-dev/indexer/internal/committer"
"github.com/thirdweb-dev/indexer/configs"
)

func main() {
// Load configuration
if err := configs.LoadConfig("config.yml"); err != nil {
log.Fatal("Failed to load config:", err)
}

// Create committer for chain ID 1 (Ethereum mainnet)
chainId := big.NewInt(1)
committer, err := committer.NewCommitterFromConfig(chainId)
if err != nil {
log.Fatal("Failed to create committer:", err)
}
defer committer.Close()

// Process blocks
ctx := context.Background()
if err := committer.ProcessBlocks(ctx); err != nil {
log.Fatal("Failed to process blocks:", err)
}
}
```
Comment on lines +20 to +52
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Examples reference non-existent APIs; align with actual public surface.

README shows NewCommitterFromConfig/NewCommitter/ProcessBlocks, but the code exposes Init/Commit (and a CLI subcommand). Please update the examples to either:

  • demonstrate the CLI: insight committer with env-config, or
  • show programmatic usage via committer.Init(chainID, rpc) then committer.Commit(chainID) (and Close()), matching cmd/committer.go.

I can draft a corrected snippet once you confirm the intended public API.

Also applies to: 56-104


### Advanced Usage with Custom Configuration

```go
package main

import (
"context"
"math/big"
"log"

"github.com/thirdweb-dev/indexer/internal/committer"
"github.com/thirdweb-dev/indexer/configs"
)

func main() {
// Custom configuration
chainId := big.NewInt(137) // Polygon

clickhouseConfig := &configs.ClickhouseConfig{
Host: "localhost",
Port: 9000,
Username: "default",
Password: "",
Database: "insight",
}

s3Config := &configs.S3Config{
Bucket: "thirdweb-insight-production",
Region: "us-east-1",
AccessKeyID: "your-access-key",
SecretAccessKey: "your-secret-key",
}

kafkaConfig := &configs.KafkaConfig{
Brokers: "localhost:9092",
}

// Create committer
committer, err := committer.NewCommitter(chainId, clickhouseConfig, s3Config, kafkaConfig)
if err != nil {
log.Fatal("Failed to create committer:", err)
}
defer committer.Close()

// Process blocks
ctx := context.Background()
if err := committer.ProcessBlocks(ctx); err != nil {
log.Fatal("Failed to process blocks:", err)
}
}
```

## Configuration Requirements

The committer requires the following configuration:

### ClickHouse Configuration
- Host, Port, Username, Password, Database
- Used to query the maximum block number for the chain

### S3 Configuration
- Bucket name (e.g., "thirdweb-insight-production")
- Region, Access Key ID, Secret Access Key
- Used to list and download parquet files

### Kafka Configuration
- Brokers list
- Used to publish processed block data

## S3 File Structure

The committer expects S3 files to follow this naming pattern:
```
chain_${chainId}/year=2024/blocks_1000_2000.parquet
```

Where:
- `chain_${chainId}` is the prefix for the chain
- `year=2024` is the partitioning by year
- `blocks_1000_2000.parquet` contains blocks from 1000 to 2000

## Parquet File Structure

The parquet files should contain the following columns:
- `chain_id` (uint64): Chain identifier
- `block_number` (uint64): Block number
- `block_hash` (string): Block hash
- `block_timestamp` (int64): Block timestamp
- `block_json` (bytes): Serialized block data
- `transactions_json` (bytes): Serialized transactions data
- `logs_json` (bytes): Serialized logs data
- `traces_json` (bytes): Serialized traces data

## Processing Flow

1. **Query ClickHouse**: Get the maximum block number for the chain
2. **List S3 Files**: Find all parquet files with the chain prefix
3. **Filter Files**: Skip files where end block ≤ max block number
4. **Sort Files**: Order by start block number (ascending)
5. **Process Sequentially**: For each file:
- Download from S3 to local storage
- Stream parquet file row-by-row
- Skip blocks < next commit block number
- Error if block > next commit block number (missing data)
- Publish found blocks to Kafka
- Increment commit block number
- Clean up local file

## Error Handling

The committer includes comprehensive error handling:
- Missing configuration validation
- S3 connection and download errors
- Parquet file parsing errors
- Kafka publishing errors
- Block sequence validation errors

All errors are logged with detailed context for debugging.

## Memory Management

The committer is designed to be memory-efficient:
- Downloads files directly to disk (no in-memory buffering)
- Streams parquet files row-by-row
- Processes one file at a time
- Cleans up local files after processing
- Uses semaphores to limit concurrent operations
Loading
Loading