Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions datafusion-cli/src/object_storage/instrumented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub enum InstrumentedObjectStoreMode {

impl fmt::Display for InstrumentedObjectStoreMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
write!(f, "{self:?}")
}
}

Expand Down Expand Up @@ -426,7 +426,7 @@ pub enum Operation {

impl fmt::Display for Operation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
write!(f, "{self:?}")
}
}

Expand Down Expand Up @@ -556,11 +556,11 @@ impl RequestSummaries {
let size_stats = s.size_stats.as_ref();
let dur_avg = duration_stats.map(|d| {
let avg = d.sum.as_secs_f32() / count;
format!("{:.6}s", avg)
format!("{avg:.6}s")
});
let size_avg = size_stats.map(|s| {
let avg = s.sum as f32 / count;
format!("{} B", avg)
format!("{avg} B")
});
[dur_avg, size_avg]
})
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/print_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl PrintOptions {

writeln!(writer, "Summaries:")?;
let summaries = RequestSummaries::new(&requests);
writeln!(writer, "{}", summaries)?;
writeln!(writer, "{summaries}")?;
}
}
}
Expand Down
47 changes: 32 additions & 15 deletions datafusion/common/src/table_reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,24 +269,41 @@ impl TableReference {
}

/// Forms a [`TableReference`] by parsing `s` as a multipart SQL
/// identifier. See docs on [`TableReference`] for more details.
/// identifier, normalizing `s` to lowercase.
/// See docs on [`TableReference`] for more details.
pub fn parse_str(s: &str) -> Self {
let mut parts = parse_identifiers_normalized(s, false);
Self::parse_str_normalized(s, false)
}

/// Forms a [`TableReference`] by parsing `s` as a multipart SQL
/// identifier, normalizing `s` to lowercase if `ignore_case` is `false`.
/// See docs on [`TableReference`] for more details.
pub fn parse_str_normalized(s: &str, ignore_case: bool) -> Self {
let table_parts = parse_identifiers_normalized(s, ignore_case);

Self::from_vec(table_parts).unwrap_or_else(|| Self::Bare { table: s.into() })
}

/// Consume a vector of identifier parts to compose a [`TableReference`]. The input vector
/// should contain 1 <= N <= 3 elements in the following sequence:
/// ```no_rust
/// [<catalog>, <schema>, table]
/// ```
fn from_vec(mut parts: Vec<String>) -> Option<Self> {
match parts.len() {
1 => Self::Bare {
table: parts.remove(0).into(),
},
2 => Self::Partial {
schema: parts.remove(0).into(),
table: parts.remove(0).into(),
},
3 => Self::Full {
catalog: parts.remove(0).into(),
schema: parts.remove(0).into(),
table: parts.remove(0).into(),
},
_ => Self::Bare { table: s.into() },
1 => Some(Self::Bare {
table: parts.pop()?.into(),
}),
2 => Some(Self::Partial {
table: parts.pop()?.into(),
schema: parts.pop()?.into(),
}),
3 => Some(Self::Full {
table: parts.pop()?.into(),
schema: parts.pop()?.into(),
catalog: parts.pop()?.into(),
}),
_ => None,
}
}

Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
Ok(idents)
}

/// Parse a string into a vector of identifiers.
///
/// Note: If ignore_case is false, the string will be normalized to lowercase.
#[cfg(feature = "sql")]
pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) -> Vec<String> {
parse_identifiers(s)
Expand Down
13 changes: 8 additions & 5 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,17 @@ where
}
}

impl From<protobuf::ColumnRelation> for TableReference {
fn from(rel: protobuf::ColumnRelation) -> Self {
Self::parse_str_normalized(rel.relation.as_str(), true)
}
}

impl From<protobuf::Column> for Column {
fn from(c: protobuf::Column) -> Self {
let protobuf::Column { relation, name } = c;

Self::new(relation.map(|r| r.relation), name)
Self::new(relation, name)
}
}

Expand All @@ -164,10 +170,7 @@ impl TryFrom<&protobuf::DfSchema> for DFSchema {
.map(|df_field| {
let field: Field = df_field.field.as_ref().required("field")?;
Ok((
df_field
.qualifier
.as_ref()
.map(|q| q.relation.clone().into()),
df_field.qualifier.as_ref().map(|q| q.clone().into()),
Arc::new(field),
))
})
Expand Down
6 changes: 2 additions & 4 deletions datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,7 @@ pub fn parse_expr(
let null_treatment = protobuf::NullTreatment::try_from(null_treatment)
.map_err(|_| {
proto_error(format!(
"Received a WindowExprNode message with unknown NullTreatment {}",
null_treatment
"Received a WindowExprNode message with unknown NullTreatment {null_treatment}",
))
})?;
Some(NullTreatment::from(null_treatment))
Expand Down Expand Up @@ -596,8 +595,7 @@ pub fn parse_expr(
let null_treatment = protobuf::NullTreatment::try_from(null_treatment)
.map_err(|_| {
proto_error(format!(
"Received an AggregateUdfExprNode message with unknown NullTreatment {}",
null_treatment
"Received an AggregateUdfExprNode message with unknown NullTreatment {null_treatment}",
))
})?;
Some(NullTreatment::from(null_treatment))
Expand Down
46 changes: 46 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2804,3 +2804,49 @@ async fn roundtrip_arrow_scan() -> Result<()> {
assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
Ok(())
}

#[tokio::test]
async fn roundtrip_mixed_case_table_reference() -> Result<()> {
// Prepare "client" database
let client_ctx = SessionContext::new_with_config(
SessionConfig::new()
.set_bool("datafusion.sql_parser.enable_ident_normalization", false),
);
client_ctx
.register_csv(
"\"TestData\"",
"tests/testdata/test.csv",
CsvReadOptions::default(),
)
.await?;

// Prepare "server" database
let server_ctx = SessionContext::new_with_config(
SessionConfig::new()
.set_bool("datafusion.sql_parser.enable_ident_normalization", false),
);
server_ctx
.register_csv(
"\"TestData\"",
"tests/testdata/test.csv",
CsvReadOptions::default(),
)
.await?;

// Create a logical plan, serialize it (client), then deserialize it (server)
let dataframe = client_ctx
.sql("SELECT a FROM TestData WHERE TestData.a = 1")
.await?;

let client_logical_plan = dataframe.into_optimized_plan()?;
let plan_bytes = logical_plan_to_bytes(&client_logical_plan)?;
let server_logical_plan =
logical_plan_from_bytes(&plan_bytes, &server_ctx.task_ctx())?;

assert_eq!(
format!("{}", client_logical_plan.display_indent_schema()),
format!("{}", server_logical_plan.display_indent_schema())
);

Ok(())
}