|
18 | 18 | //! mem_profile binary entrypoint
|
19 | 19 | use datafusion::error::Result;
|
20 | 20 | use std::{
|
| 21 | + env, |
21 | 22 | io::{BufRead, BufReader},
|
22 | 23 | process::{Command, Stdio},
|
23 | 24 | };
|
24 | 25 | use structopt::StructOpt;
|
25 | 26 |
|
26 |
| -#[derive(Debug, StructOpt)] |
27 |
| -#[structopt(about = "memory profile command")] |
28 |
| -struct MemProfileOpt { |
29 |
| - #[structopt(subcommand)] |
30 |
| - command: BenchmarkCommand, |
31 |
| -} |
32 |
| - |
33 |
| -#[derive(Debug, StructOpt)] |
34 |
| -enum BenchmarkCommand { |
35 |
| - Tpch(TpchOpt), |
36 |
| - // TODO Add other benchmark commands here |
37 |
| -} |
| 27 | +use datafusion_benchmarks::{ |
| 28 | + clickbench, |
| 29 | + h2o::{self, AllQueries}, |
| 30 | + imdb, sort_tpch, tpch, |
| 31 | +}; |
38 | 32 |
|
39 | 33 | #[derive(Debug, StructOpt)]
|
40 |
| -struct TpchOpt { |
41 |
| - #[structopt(long, required = true)] |
42 |
| - path: String, |
43 |
| - |
44 |
| - /// Query number. If not specified, runs all queries |
45 |
| - #[structopt(short, long)] |
46 |
| - query: Option<usize>, |
| 34 | +#[structopt(about = "benchmark command")] |
| 35 | +#[allow(dead_code)] |
| 36 | +enum Options { |
| 37 | + Clickbench(clickbench::RunOpt), |
| 38 | + H2o(h2o::RunOpt), |
| 39 | + Imdb(imdb::RunOpt), |
| 40 | + SortTpch(sort_tpch::RunOpt), |
| 41 | + Tpch(tpch::RunOpt), |
47 | 42 | }
|
48 | 43 |
|
49 | 44 | #[tokio::main]
|
50 | 45 | pub async fn main() -> Result<()> {
|
51 | 46 | // 1. parse args and check which benchmarks should be run
|
52 |
| - let opt = MemProfileOpt::from_args(); |
| 47 | + // let opt = MemProfileOpt::from_args(); |
| 48 | + let profile = env::var("PROFILE").unwrap_or_else(|_| "release".to_string()); |
| 49 | + |
| 50 | + let args = env::args().skip(1); |
| 51 | + // let opt = Options::from_iter(args); |
| 52 | + let query_range = match Options::from_args() { |
| 53 | + // TODO clickbench |
| 54 | + // TODO run for specific query id |
| 55 | + Options::Clickbench(_) => 0..=42, |
| 56 | + Options::H2o(opt) => { |
| 57 | + let queries = AllQueries::try_new(&opt.queries_path)?; |
| 58 | + match opt.query { |
| 59 | + Some(query_id) => query_id..=query_id, |
| 60 | + None => queries.min_query_id()..=queries.max_query_id(), |
| 61 | + } |
| 62 | + } |
| 63 | + Options::Imdb(_) => imdb::IMDB_QUERY_START_ID..=imdb::IMDB_QUERY_END_ID, |
| 64 | + Options::SortTpch(_) => { |
| 65 | + sort_tpch::SORT_TPCH_QUERY_START_ID..=sort_tpch::SORT_TPCH_QUERY_END_ID |
| 66 | + } |
| 67 | + Options::Tpch(_) => tpch::TPCH_QUERY_START_ID..=tpch::TPCH_QUERY_END_ID, |
| 68 | + }; |
53 | 69 |
|
54 | 70 | // 2. prebuild test binary so that memory does not blow up due to build process
|
55 |
| - // check binary file location |
56 | 71 | println!("Pre-building benchmark binary...");
|
57 | 72 | let status = Command::new("cargo")
|
58 | 73 | .args([
|
59 | 74 | "build",
|
60 | 75 | "--profile",
|
61 |
| - "release-nonlto", |
| 76 | + &profile, |
62 | 77 | "--features",
|
63 | 78 | "mimalloc_extended",
|
64 | 79 | "--bin",
|
65 | 80 | "dfbench",
|
66 | 81 | ])
|
67 | 82 | .status()
|
68 | 83 | .expect("Failed to build dfbench");
|
69 |
| - |
70 |
| - if !status.success() { |
71 |
| - panic!("Failed to build dfbench"); |
72 |
| - } |
| 84 | + assert!(status.success()); |
73 | 85 | println!("Benchmark binary built successfully.");
|
74 | 86 |
|
75 |
| - // 3. create a subprocess, run each benchmark with args (1) (2) |
76 |
| - match opt.command { |
77 |
| - BenchmarkCommand::Tpch(tpch_opt) => { |
78 |
| - run_tpch_benchmark(tpch_opt).await?; |
79 |
| - } |
80 |
| - } |
| 87 | + // 3. spawn a new process per each benchmark query and print summary |
| 88 | + let mut dfbench_args: Vec<String> = args.collect(); |
| 89 | + println!("{dfbench_args:?}"); |
| 90 | + run_benchmark_as_child_process(&profile, query_range, &mut dfbench_args)?; |
81 | 91 |
|
82 |
| - // (maybe we cannot support result file.. and just have to print..) |
83 | 92 | Ok(())
|
84 | 93 | }
|
85 | 94 |
|
86 |
| -async fn run_tpch_benchmark(opt: TpchOpt) -> Result<()> { |
87 |
| - let mut args: Vec<String> = vec![ |
88 |
| - "./target/release-nonlto/dfbench".to_string(), |
89 |
| - "tpch".to_string(), |
90 |
| - "--iterations".to_string(), |
91 |
| - "1".to_string(), |
92 |
| - "--path".to_string(), |
93 |
| - opt.path.clone(), |
94 |
| - "--format".to_string(), |
95 |
| - "parquet".to_string(), |
96 |
| - "--partitions".to_string(), |
97 |
| - "4".to_string(), |
98 |
| - "--query".to_string(), |
99 |
| - ]; |
100 |
| - |
| 95 | +fn run_benchmark_as_child_process( |
| 96 | + profile: &str, |
| 97 | + query_range: std::ops::RangeInclusive<usize>, |
| 98 | + args: &mut Vec<String>, |
| 99 | +) -> Result<()> { |
101 | 100 | let mut query_strings: Vec<String> = Vec::new();
|
102 |
| - if let Some(query_id) = opt.query { |
103 |
| - query_strings.push(query_id.to_string()); |
104 |
| - } else { |
105 |
| - // run all queries. |
106 |
| - for i in 1..=22 { |
107 |
| - query_strings.push(i.to_string()); |
108 |
| - } |
| 101 | + for i in query_range { |
| 102 | + query_strings.push(i.to_string()); |
109 | 103 | }
|
110 | 104 |
|
| 105 | + let command = format!("target/{profile}/dfbench"); |
| 106 | + args.insert(0, command); |
| 107 | + args.push("--query".to_string()); |
| 108 | + |
111 | 109 | let mut results = vec![];
|
112 | 110 | for query_str in query_strings {
|
113 | 111 | args.push(query_str);
|
114 |
| - let _ = run_query(&args, &mut results); |
| 112 | + let _ = run_query(args, &mut results); |
115 | 113 | args.pop();
|
116 | 114 | }
|
117 | 115 |
|
|
0 commit comments