Skip to content

Commit 86334b0

Browse files
committed
compress messages
1 parent 714b211 commit 86334b0

File tree

1 file changed

+11
-1
lines changed

1 file changed

+11
-1
lines changed

internal/storage/kafka_publisher.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"sync"
1111
"time"
1212

13+
"github.com/klauspost/compress/zstd"
1314
"github.com/rs/zerolog/log"
1415
config "github.com/thirdweb-dev/indexer/configs"
1516
"github.com/thirdweb-dev/indexer/internal/common"
@@ -306,19 +307,28 @@ func (p *KafkaPublisher) createBlockRevertMessage(chainId uint64, blockNumber ui
306307
}
307308

308309
func (p *KafkaPublisher) createRecord(msgType MessageType, chainId uint64, blockNumber uint64, timestamp time.Time, msgJson []byte) (*kgo.Record, error) {
310+
encoder, err := zstd.NewWriter(nil)
311+
if err != nil {
312+
log.Fatal().Err(err).Msg("failed to create zstd encoder")
313+
}
314+
defer encoder.Close()
315+
316+
compressed := encoder.EncodeAll([]byte(msgJson), nil)
317+
309318
// Create headers with metadata
310319
headers := []kgo.RecordHeader{
311320
{Key: "chain_id", Value: []byte(fmt.Sprintf("%d", chainId))},
312321
{Key: "block_number", Value: []byte(fmt.Sprintf("%d", blockNumber))},
313322
{Key: "type", Value: []byte(fmt.Sprintf("%s", msgType))},
314323
{Key: "timestamp", Value: []byte(timestamp.Format(time.RFC3339Nano))},
315324
{Key: "schema_version", Value: []byte("1")},
325+
{Key: "content-type", Value: []byte("zstd")},
316326
}
317327

318328
return &kgo.Record{
319329
Topic: fmt.Sprintf("insight.commit.blocks.%d", chainId),
320330
Key: []byte(fmt.Sprintf("%d:%s:%d", chainId, msgType, blockNumber)),
321-
Value: msgJson,
331+
Value: compressed,
322332
Headers: headers,
323333
Partition: 0,
324334
}, nil

0 commit comments

Comments
 (0)