Skip to content

Commit 385f7d7

Browse files
authored
nil block (#301)
1 parent 98849e3 commit 385f7d7

File tree

1 file changed

+19
-4
lines changed

1 file changed

+19
-4
lines changed

internal/storage/kafka_publisher.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -247,18 +247,29 @@ func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isDelet
247247

248248
publishStart := time.Now()
249249

250-
// Prepare messages for blocks, events, transactions and traces
251-
blockMessages := make([]*kgo.Record, len(blockData))
250+
// Filter out nil blocks and prepare messages
251+
blockMessages := make([]*kgo.Record, 0, len(blockData))
252+
253+
for _, data := range blockData {
254+
// Skip nil blocks
255+
if data == nil {
256+
log.Warn().Msg("Skipping nil block in publishBlockData")
257+
continue
258+
}
252259

253-
for i, data := range blockData {
254260
// Block message
255261
if blockMsg, err := p.createBlockDataMessage(data, isDeleted, isReorg); err == nil {
256-
blockMessages[i] = blockMsg
262+
blockMessages = append(blockMessages, blockMsg)
257263
} else {
258264
return fmt.Errorf("failed to create block message: %v", err)
259265
}
260266
}
261267

268+
if len(blockMessages) == 0 {
269+
log.Warn().Msg("No valid blocks to publish after filtering")
270+
return nil
271+
}
272+
262273
if err := p.publishMessages(context.Background(), blockMessages); err != nil {
263274
return fmt.Errorf("failed to publish block messages: %v", err)
264275
}
@@ -268,6 +279,10 @@ func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isDelet
268279
}
269280

270281
func (p *KafkaPublisher) createBlockDataMessage(block *common.BlockData, isDeleted bool, isReorg bool) (*kgo.Record, error) {
282+
if block == nil {
283+
return nil, fmt.Errorf("block is nil")
284+
}
285+
271286
timestamp := time.Now()
272287

273288
data := PublishableMessageBlockData{

0 commit comments

Comments
 (0)