Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 39 additions & 15 deletions datafusion/common/src/table_reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,24 +268,48 @@ impl TableReference {
}

/// Forms a [`TableReference`] by parsing `s` as a multipart SQL
/// identifier. See docs on [`TableReference`] for more details.
/// identifier, normalizing `s` to lowercase.
/// See docs on [`TableReference`] for more details.
pub fn parse_str(s: &str) -> Self {
let mut parts = parse_identifiers_normalized(s, false);
Self::parse_str_normalized(s, false)
}

/// Forms a [`TableReference`] by parsing `s` as a multipart SQL
/// identifier, normalizing `s` to lowercase if `ignore_case` is `false`.
/// See docs on [`TableReference`] for more details.
pub fn parse_str_normalized(s: &str, ignore_case: bool) -> Self {
let table_parts = parse_identifiers_normalized(s, ignore_case);

Self::from_vec(table_parts)
.unwrap_or(Self::Bare { table: s.into() })
}

/// Compose a [`TableReference`] from separate parts. The input vector should contain
/// at most three elements in the following sequence:
/// ```no_rust
/// [<catalog>, <schema>, table]
/// ```
pub fn from_vec(mut parts: Vec<String>) -> Option<Self> {
match parts.len() {
1 => Self::Bare {
table: parts.remove(0).into(),
},
2 => Self::Partial {
schema: parts.remove(0).into(),
table: parts.remove(0).into(),
},
3 => Self::Full {
catalog: parts.remove(0).into(),
schema: parts.remove(0).into(),
table: parts.remove(0).into(),
},
_ => Self::Bare { table: s.into() },
1 => Some(
Self::Bare {
table: parts.remove(0).into(),
}
),
2 => Some(
Self::Partial {
schema: parts.remove(0).into(),
table: parts.remove(0).into(),
}
),
3 => Some(
Self::Full {
catalog: parts.remove(0).into(),
schema: parts.remove(0).into(),
table: parts.remove(0).into(),
}
),
_ => None,
}
}

Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
Ok(idents)
}

/// Parse a string into a vector of identifiers.
///
/// Note: If ignore_case is false, the string will be normalized to lowercase.
pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) -> Vec<String> {
parse_identifiers(s)
.unwrap_or_default()
Expand Down
16 changes: 14 additions & 2 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,23 @@ where
}
}

impl From<protobuf::ColumnRelation> for TableReference {
fn from(rel: protobuf::ColumnRelation) -> Self {
Self::parse_str_normalized(rel.relation.as_str(), true)
}
}

impl From<&protobuf::ColumnRelation> for TableReference {
fn from(rel: &protobuf::ColumnRelation) -> Self {
rel.clone().into()
}
}

impl From<protobuf::Column> for Column {
fn from(c: protobuf::Column) -> Self {
let protobuf::Column { relation, name } = c;

Self::new(relation.map(|r| r.relation), name)
Self::new(relation, name)
}
}

Expand All @@ -167,7 +179,7 @@ impl TryFrom<&protobuf::DfSchema> for DFSchema {
df_field
.qualifier
.as_ref()
.map(|q| q.relation.clone().into()),
.map(|q| q.into()),
Arc::new(field),
))
})
Expand Down
38 changes: 38 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2656,3 +2656,41 @@ async fn roundtrip_custom_listing_tables_schema() -> Result<()> {
assert_eq!(plan, new_plan);
Ok(())
}

#[tokio::test]
async fn roundtrip_mixed_case_table_reference() -> Result<()> {
// Prepare "client" database
let client_ctx = SessionContext::new_with_config(SessionConfig::new()
.set_bool("datafusion.sql_parser.enable_ident_normalization", false)
);
client_ctx.register_csv(
"\"TestData\"",
"tests/testdata/test.csv",
CsvReadOptions::default()
)
.await?;

// Prepare "server" database
let server_ctx = SessionContext::new_with_config(SessionConfig::new()
.set_bool("datafusion.sql_parser.enable_ident_normalization", false)
);
server_ctx.register_csv(
"\"TestData\"",
"tests/testdata/test.csv",
CsvReadOptions::default()
)
.await?;

// Create a logical plan, serialize it (client), then deserialize it (server)
let dataframe = client_ctx.sql("SELECT a FROM TestData WHERE TestData.a = 1").await?;
let client_logical_plan = dataframe.into_optimized_plan()?;
let plan_bytes = logical_plan_to_bytes(&client_logical_plan)?;
let server_logical_plan = logical_plan_from_bytes(&plan_bytes, &server_ctx)?;

assert_eq!(
format!("{}", client_logical_plan.display_indent_schema()),
format!("{}", server_logical_plan.display_indent_schema())
);

Ok(())
}