Skip to content

Commit a306475

Browse files
committed
conditional compression threshold
1 parent 00df078 commit a306475

File tree

2 files changed

+20
-8
lines changed

2 files changed

+20
-8
lines changed

configs/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ type Config struct {
294294
CommitterKafkaPassword string `env:"COMMITTER_KAFKA_PASSWORD"`
295295
CommitterKafkaEnableTLS bool `env:"COMMITTER_KAFKA_ENABLE_TLS" envDefault:"true"`
296296
CommitterMaxMemoryMB int `env:"COMMITTER_MAX_MEMORY_MB" envDefault:"512"`
297+
CommitterCompressionThresholdMB int `env:"COMMITTER_COMPRESSION_THRESHOLD_MB" envDefault:"50"`
297298
StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"`
298299
StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"`
299300
StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"`

internal/storage/kafka_publisher.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -307,13 +307,24 @@ func (p *KafkaPublisher) createBlockRevertMessage(chainId uint64, blockNumber ui
307307
}
308308

309309
func (p *KafkaPublisher) createRecord(msgType MessageType, chainId uint64, blockNumber uint64, timestamp time.Time, msgJson []byte) (*kgo.Record, error) {
310-
encoder, err := zstd.NewWriter(nil)
311-
if err != nil {
312-
log.Fatal().Err(err).Msg("failed to create zstd encoder")
313-
}
314-
defer encoder.Close()
310+
compressionThreshold := config.Cfg.CommitterCompressionThresholdMB * 1024 * 1024
315311

316-
compressed := encoder.EncodeAll([]byte(msgJson), nil)
312+
var value []byte
313+
var contentType string
314+
315+
if len(msgJson) >= compressionThreshold {
316+
encoder, err := zstd.NewWriter(nil)
317+
if err != nil {
318+
log.Fatal().Err(err).Msg("failed to create zstd encoder")
319+
}
320+
defer encoder.Close()
321+
322+
value = encoder.EncodeAll([]byte(msgJson), nil)
323+
contentType = "zstd"
324+
} else {
325+
value = msgJson
326+
contentType = "json"
327+
}
317328

318329
// Create headers with metadata
319330
headers := []kgo.RecordHeader{
@@ -322,13 +333,13 @@ func (p *KafkaPublisher) createRecord(msgType MessageType, chainId uint64, block
322333
{Key: "type", Value: []byte(fmt.Sprintf("%s", msgType))},
323334
{Key: "timestamp", Value: []byte(timestamp.Format(time.RFC3339Nano))},
324335
{Key: "schema_version", Value: []byte("1")},
325-
{Key: "content-type", Value: []byte("zstd")},
336+
{Key: "content-type", Value: []byte(contentType)},
326337
}
327338

328339
return &kgo.Record{
329340
Topic: fmt.Sprintf("insight.commit.blocks.%d", chainId),
330341
Key: []byte(fmt.Sprintf("%d:%s:%d", chainId, msgType, blockNumber)),
331-
Value: compressed,
342+
Value: value,
332343
Headers: headers,
333344
Partition: 0,
334345
}, nil

0 commit comments

Comments
 (0)