Skip to content

Commit d5ea5e9

Browse files
authored
Adds DELETE and HEAD instrumentation to CLI (#18206)
## Which issue does this PR close? This does not fully close, but is an incremental building block component for: - #17207 The full context of how this code is likely to progress can be seen in the POC for this effort: - #17266 ## Rationale for this change Further fills out method instrumentation ## What changes are included in this PR? - Adds instrumentation to head requests in the instrumented object store - Adds instrumentatin to delete requests in the instrumented object store - Adds tests for new code ## Are these changes tested? Yes. New unit tests have been added. ## Are there any user-facing changes? No-ish ## cc @alamb
1 parent 1e30aed commit d5ea5e9

File tree

3 files changed

+111
-4
lines changed

3 files changed

+111
-4
lines changed

datafusion-cli/src/object_storage/instrumented.rs

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,25 @@ impl InstrumentedObjectStore {
194194
Ok(ret)
195195
}
196196

197+
async fn instrumented_delete(&self, location: &Path) -> Result<()> {
198+
let timestamp = Utc::now();
199+
let start = Instant::now();
200+
self.inner.delete(location).await?;
201+
let elapsed = start.elapsed();
202+
203+
self.requests.lock().push(RequestDetails {
204+
op: Operation::Delete,
205+
path: location.clone(),
206+
timestamp,
207+
duration: Some(elapsed),
208+
size: None,
209+
range: None,
210+
extra_display: None,
211+
});
212+
213+
Ok(())
214+
}
215+
197216
fn instrumented_list(
198217
&self,
199218
prefix: Option<&Path>,
@@ -235,6 +254,25 @@ impl InstrumentedObjectStore {
235254

236255
Ok(ret)
237256
}
257+
258+
async fn instrumented_head(&self, location: &Path) -> Result<ObjectMeta> {
259+
let timestamp = Utc::now();
260+
let start = Instant::now();
261+
let ret = self.inner.head(location).await?;
262+
let elapsed = start.elapsed();
263+
264+
self.requests.lock().push(RequestDetails {
265+
op: Operation::Head,
266+
path: location.clone(),
267+
timestamp,
268+
duration: Some(elapsed),
269+
size: None,
270+
range: None,
271+
extra_display: None,
272+
});
273+
274+
Ok(ret)
275+
}
238276
}
239277

240278
impl fmt::Display for InstrumentedObjectStore {
@@ -285,6 +323,10 @@ impl ObjectStore for InstrumentedObjectStore {
285323
}
286324

287325
async fn delete(&self, location: &Path) -> Result<()> {
326+
if self.enabled() {
327+
return self.instrumented_delete(location).await;
328+
}
329+
288330
self.inner.delete(location).await
289331
}
290332

@@ -313,6 +355,10 @@ impl ObjectStore for InstrumentedObjectStore {
313355
}
314356

315357
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
358+
if self.enabled() {
359+
return self.instrumented_head(location).await;
360+
}
361+
316362
self.inner.head(location).await
317363
}
318364
}
@@ -321,9 +367,9 @@ impl ObjectStore for InstrumentedObjectStore {
321367
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
322368
pub enum Operation {
323369
_Copy,
324-
_Delete,
370+
Delete,
325371
Get,
326-
_Head,
372+
Head,
327373
List,
328374
Put,
329375
}
@@ -753,6 +799,35 @@ mod tests {
753799
assert!(request.extra_display.is_none());
754800
}
755801

802+
#[tokio::test]
803+
async fn instrumented_store_delete() {
804+
let (instrumented, path) = setup_test_store().await;
805+
806+
// By default no requests should be instrumented/stored
807+
assert!(instrumented.requests.lock().is_empty());
808+
instrumented.delete(&path).await.unwrap();
809+
assert!(instrumented.requests.lock().is_empty());
810+
811+
// We need a new store so we have data to delete again
812+
let (instrumented, path) = setup_test_store().await;
813+
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
814+
assert!(instrumented.requests.lock().is_empty());
815+
instrumented.delete(&path).await.unwrap();
816+
assert_eq!(instrumented.requests.lock().len(), 1);
817+
818+
let mut requests = instrumented.take_requests();
819+
assert_eq!(requests.len(), 1);
820+
assert!(instrumented.requests.lock().is_empty());
821+
822+
let request = requests.pop().unwrap();
823+
assert_eq!(request.op, Operation::Delete);
824+
assert_eq!(request.path, path);
825+
assert!(request.duration.is_some());
826+
assert!(request.size.is_none());
827+
assert!(request.range.is_none());
828+
assert!(request.extra_display.is_none());
829+
}
830+
756831
#[tokio::test]
757832
async fn instrumented_store_list() {
758833
let (instrumented, path) = setup_test_store().await;
@@ -865,6 +940,33 @@ mod tests {
865940
assert!(request.extra_display.is_none());
866941
}
867942

943+
#[tokio::test]
944+
async fn instrumented_store_head() {
945+
let (instrumented, path) = setup_test_store().await;
946+
947+
// By default no requests should be instrumented/stored
948+
assert!(instrumented.requests.lock().is_empty());
949+
let _ = instrumented.head(&path).await.unwrap();
950+
assert!(instrumented.requests.lock().is_empty());
951+
952+
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
953+
assert!(instrumented.requests.lock().is_empty());
954+
let _ = instrumented.head(&path).await.unwrap();
955+
assert_eq!(instrumented.requests.lock().len(), 1);
956+
957+
let mut requests = instrumented.take_requests();
958+
assert_eq!(requests.len(), 1);
959+
assert!(instrumented.requests.lock().is_empty());
960+
961+
let request = requests.pop().unwrap();
962+
assert_eq!(request.op, Operation::Head);
963+
assert_eq!(request.path, path);
964+
assert!(request.duration.is_some());
965+
assert!(request.size.is_none());
966+
assert!(request.range.is_none());
967+
assert!(request.extra_display.is_none());
968+
}
969+
868970
#[test]
869971
fn request_details() {
870972
let rd = RequestDetails {

datafusion-cli/tests/cli_integration.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -411,8 +411,8 @@ async fn test_object_store_profiling() {
411411
// Output:
412412
// <TIMESTAMP> operation=Get duration=[DURATION] size=1006 path=cars.csv
413413
settings.add_filter(
414-
r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?[+-]\d{2}:\d{2} operation=(Get|Put|Delete|List|Head) duration=\d+\.\d{6}s size=(\d+) path=(.*)",
415-
"<TIMESTAMP> operation=$1 duration=[DURATION] size=$2 path=$3",
414+
r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?[+-]\d{2}:\d{2} operation=(Get|Put|Delete|List|Head) duration=\d+\.\d{6}s (size=\d+\s+)?path=(.*)",
415+
"<TIMESTAMP> operation=$1 duration=[DURATION] ${2}path=$3",
416416
);
417417

418418
// We also need to filter out the summary statistics (anything with an 's' at the end)

datafusion-cli/tests/snapshots/object_store_profiling.snap

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ ObjectStore Profile mode set to Trace
3737

3838
Object Store Profiling
3939
Instrumented Object Store: instrument_mode: Trace, inner: AmazonS3(data)
40+
<TIMESTAMP> operation=Head duration=[DURATION] path=cars.csv
4041
<TIMESTAMP> operation=Get duration=[DURATION] size=1006 path=cars.csv
4142

4243
Summaries:
@@ -45,6 +46,8 @@ Summaries:
4546
+-----------+----------+-----------+-----------+-----------+-----------+-------+
4647
| Get | duration | ...NORMALIZED...| 1 |
4748
| Get | size | 1006 B | 1006 B | 1006 B | 1006 B | 1 |
49+
| Head | duration | ...NORMALIZED...| 1 |
50+
| Head | size | | | | | 1 |
4851
+-----------+----------+-----------+-----------+-----------+-----------+-------+
4952
ObjectStore Profile mode set to Summary
5053
+-----+-------+---------------------+
@@ -63,6 +66,8 @@ Summaries:
6366
+-----------+----------+-----------+-----------+-----------+-----------+-------+
6467
| Get | duration | ...NORMALIZED...| 1 |
6568
| Get | size | 1006 B | 1006 B | 1006 B | 1006 B | 1 |
69+
| Head | duration | ...NORMALIZED...| 1 |
70+
| Head | size | | | | | 1 |
6671
+-----------+----------+-----------+-----------+-----------+-----------+-------+
6772
ObjectStore Profile mode set to Disabled
6873
+-----+-------+---------------------+

0 commit comments

Comments
 (0)