Skip to content

Commit 2d1874a

Browse files
authored
[log] add server side checksum calculation time (#2523)
1 parent 42d5245 commit 2d1874a

File tree

3 files changed

+22
-39
lines changed

3 files changed

+22
-39
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/NettyP2PBlobTransferManager.java

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -215,11 +215,9 @@ private void processPeersSequentially(
215215

216216
// Attempt to fetch the blob from the current peer asynchronously
217217
LOGGER.info(
218-
"Attempting to connect to host: {} for store {} version {} partition {} table format {}",
218+
"Attempting to connect to host: {} for replica {} table format {}",
219219
chosenHost,
220-
storeName,
221-
version,
222-
partition,
220+
replicaId,
223221
tableFormat);
224222

225223
CompletionStage<InputStream> perHostTransferFuture =
@@ -315,10 +313,8 @@ private void updateBlobTransferFileReceiveStats(double transferTime, String stor
315313
aggVersionedBlobTransferStats.recordBlobTransferFileReceiveThroughput(storeName, version, throughput);
316314
} catch (Exception e) {
317315
LOGGER.error(
318-
"Failed to update updateBlobTransferFileReceiveStats for store {} version {} partition {}",
319-
storeName,
320-
version,
321-
partition,
316+
"Failed to update updateBlobTransferFileReceiveStats for replica {}",
317+
Utils.getReplicaId(Version.composeKafkaTopic(storeName, version), partition),
322318
e);
323319
}
324320
}
@@ -334,25 +330,18 @@ private void updateBlobTransferFileReceiveStats(double transferTime, String stor
334330
private List<String> getConnectableHosts(List<String> discoverPeers, String storeName, int version, int partition) {
335331
// Extract unique hosts from the discovered peers
336332
Set<String> uniquePeers = discoverPeers.stream().map(peer -> peer.split("_")[0]).collect(Collectors.toSet());
333+
String replicaId = Utils.getReplicaId(Version.composeKafkaTopic(storeName, version), partition);
337334

338-
LOGGER.info(
339-
"Discovered {} unique peers store {} version {} partition {}, peers are {}",
340-
uniquePeers.size(),
341-
storeName,
342-
version,
343-
partition,
344-
uniquePeers);
335+
LOGGER.info("Discovered {} unique peers for replica {}, peers are {}", uniquePeers.size(), replicaId, uniquePeers);
345336

346337
// Get the connectable hosts for this store, version, and partition
347338
Set<String> connectablePeers =
348339
nettyClient.getConnectableHosts((HashSet<String>) uniquePeers, storeName, version, partition);
349340

350341
LOGGER.info(
351-
"Total {} unique connectable peers for store {} version {} partition {}, peers are {}",
342+
"Total {} unique connectable peers for replica {}, peers are {}",
352343
connectablePeers.size(),
353-
storeName,
354-
version,
355-
partition,
344+
replicaId,
356345
connectablePeers);
357346

358347
// Change to list and shuffle the list

clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ public Set<String> getConnectableHosts(
161161
int version,
162162
int partition) {
163163
List<CompletableFuture<String>> futures = new ArrayList<>();
164+
String replicaId = Utils.getReplicaId(storeName, version, partition);
164165

165166
// 1. Purge the host connectivity records that are stale
166167
purgeStaleConnectivityRecords(unconnectableHostsToTimestamp);
@@ -200,23 +201,12 @@ public Set<String> getConnectableHosts(
200201
});
201202
return host;
202203
} else {
203-
LOGGER.warn(
204-
"Failed to connect to host: {} for store {} version {} partition {}",
205-
host,
206-
storeName,
207-
version,
208-
partition);
204+
LOGGER.warn("Failed to connect to host: {} for replica {}", host, replicaId);
209205
unconnectableHostsToTimestamp.put(host, System.currentTimeMillis());
210206
return null;
211207
}
212208
} catch (Exception e) {
213-
LOGGER.warn(
214-
"Failed to connect to host: {} for store {} version {} partition {}",
215-
host,
216-
storeName,
217-
version,
218-
partition,
219-
e);
209+
LOGGER.warn("Failed to connect to host: {} for replica {}", host, replicaId, e);
220210
unconnectableHostsToTimestamp.put(host, System.currentTimeMillis());
221211
return null;
222212
}
@@ -311,9 +301,8 @@ public CompletionStage<InputStream> get(
311301
.completeExceptionally(
312302
new VenicePeersConnectionException(
313303
"The host " + host
314-
+ " channel already have P2PFileTransferClientHandler/P2PMetadataTransferHandler for "
315-
+ storeName + " version " + version + " partition " + partition + " table format "
316-
+ requestedTableFormat));
304+
+ " channel already have P2PFileTransferClientHandler/P2PMetadataTransferHandler for replica "
305+
+ replicaId + " table format " + requestedTableFormat));
317306
return perHostTransferFuture;
318307
}
319308

@@ -358,10 +347,8 @@ public CompletionStage<InputStream> get(
358347
connectTimeoutScheduler.schedule(() -> {
359348
if (!perHostTransferFuture.toCompletableFuture().isDone()) {
360349
String errorMsg = String.format(
361-
"Request timed out for store %s version %d partition %d table format %s from host %s after %d minutes",
362-
storeName,
363-
version,
364-
partition,
350+
"Request timed out for replica %s table format %s from host %s after %d minutes",
351+
replicaId,
365352
requestedTableFormat,
366353
host,
367354
blobReceiveTimeoutInMin);

clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,14 @@ private void sendFile(
280280
RandomAccessFile raf = new RandomAccessFile(file, "r");
281281
ChannelFuture sendFileFuture;
282282
long length = raf.length();
283+
284+
long checksumStartTime = System.currentTimeMillis();
283285
String fileChecksum = BlobTransferUtils.generateFileChecksum(file.toPath());
286+
LOGGER.info(
287+
"Checksum calculation for file: {} for replica {} took {} ms.",
288+
file.getName(),
289+
replicaInfo,
290+
System.currentTimeMillis() - checksumStartTime);
284291

285292
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
286293
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, length);

0 commit comments

Comments
 (0)