Skip to content

Commit 4e46433

Browse files
haoxu07claude
andcommitted
[common] Fall back to producer timestamp when pub-sub system timestamp is
unavailable Venice currently relies on the pub-sub system (broker) timestamp for getPubSubMessageTime(), but this timestamp is not always available or accurate across all pub-sub systems. Venice already embeds a reliable producer timestamp inside every KafkaMessageEnvelope via producerMetadata.messageTimestamp, which is always set by VeniceWriter. This change modifies PubSubMessageDeserializer to prefer the pub-sub system timestamp when it is available and non-zero, but fall back to the Venice producer timestamp when the pub-sub system provides no per-message timestamp (timestamp is null or 0). This is a no-op for existing Kafka deployments with LOG_APPEND_TIME since broker timestamps are always valid and non-zero. For pub-sub systems that do not provide reliable per-message timestamps, this ensures all 35+ downstream consumers of getPubSubMessageTime() — including DIV validation, latency metrics, heartbeat processing, and CDC events — receive a meaningful timestamp without any individual code changes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0be1bda commit 4e46433

File tree

3 files changed

+53
-2
lines changed

3 files changed

+53
-2
lines changed

internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessage.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ default POSITION getOffset() {
3030
POSITION getPosition();
3131

3232
/**
33-
* @return the timestamp at which the message was persisted in the pub sub system
33+
* @return the best-available message timestamp. This is the pub-sub system timestamp when available and
34+
* non-zero, otherwise the Venice producer timestamp embedded in the message envelope.
3435
*/
3536
long getPubSubMessageTime();
3637

internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,17 @@ public DefaultPubSubMessage deserialize(
8181
if (value == null) {
8282
value = valueSerializer.deserialize(valueBytes, getEnvelope(key.getKeyHeaderByte()));
8383
}
84+
// Prefer Venice's own producer timestamp when the pub-sub system timestamp is missing or zero.
85+
// Some pub-sub systems do not provide reliable per-message timestamps. Venice always embeds a
86+
// producer timestamp in the KafkaMessageEnvelope, so we can fall back to it.
87+
long effectiveTimestamp = timestamp != null && timestamp > 0 ? timestamp : value.producerMetadata.messageTimestamp;
8488
// TODO: Put the message container in an object pool as well
8589
return new ImmutablePubSubMessage(
8690
key,
8791
value,
8892
topicPartition,
8993
pubSubPosition,
90-
timestamp,
94+
effectiveTimestamp,
9195
keyBytes.length + valueBytes.length,
9296
headers);
9397
}

internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializerTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,52 @@ public void testDeserializerValueWithInvalidSchemaFromPubSubMessageHeaders() {
132132
assertEquals(message.getPosition(), position);
133133
}
134134

135+
@Test
136+
public void testDeserializerFallsBackToProducerTimestampWhenBrokerTimestampIsZero() {
137+
KafkaKey key = new KafkaKey(MessageType.PUT, "key".getBytes());
138+
KafkaMessageEnvelope value = getDummyValue();
139+
long producerTimestamp = 1700000000000L;
140+
value.producerMetadata.messageTimestamp = producerTimestamp;
141+
142+
// When broker timestamp is 0, should fall back to producer timestamp
143+
DefaultPubSubMessage message = messageDeserializer.deserialize(
144+
topicPartition,
145+
keySerializer.serialize("test", key),
146+
valueSerializer.serialize("test", value),
147+
new PubSubMessageHeaders(),
148+
position,
149+
0L);
150+
assertEquals(message.getPubSubMessageTime(), producerTimestamp);
151+
152+
// When broker timestamp is valid, should use broker timestamp
153+
long brokerTimestamp = 1700000001000L;
154+
message = messageDeserializer.deserialize(
155+
topicPartition,
156+
keySerializer.serialize("test", key),
157+
valueSerializer.serialize("test", value),
158+
new PubSubMessageHeaders(),
159+
position,
160+
brokerTimestamp);
161+
assertEquals(message.getPubSubMessageTime(), brokerTimestamp);
162+
}
163+
164+
@Test
165+
public void testDeserializerFallsBackToProducerTimestampWhenBrokerTimestampIsNull() {
166+
KafkaKey key = new KafkaKey(MessageType.PUT, "key".getBytes());
167+
KafkaMessageEnvelope value = getDummyValue();
168+
long producerTimestamp = 1700000000000L;
169+
value.producerMetadata.messageTimestamp = producerTimestamp;
170+
171+
DefaultPubSubMessage message = messageDeserializer.deserialize(
172+
topicPartition,
173+
keySerializer.serialize("test", key),
174+
valueSerializer.serialize("test", value),
175+
new PubSubMessageHeaders(),
176+
position,
177+
null);
178+
assertEquals(message.getPubSubMessageTime(), producerTimestamp);
179+
}
180+
135181
private KafkaMessageEnvelope getDummyValue() {
136182
KafkaMessageEnvelope value = new KafkaMessageEnvelope();
137183
value.producerMetadata = new ProducerMetadata();

0 commit comments

Comments
 (0)