Skip to content

Commit da2f9e1

Browse files
authored
refactor: simplify json_shredding example by using ListingTable (#17369)
Signed-off-by: Ruihang Xia <[email protected]>
1 parent 3abee73 commit da2f9e1

File tree

1 file changed

+20
-110
lines changed

1 file changed

+20
-110
lines changed

datafusion-examples/examples/json_shredding.rs

Lines changed: 20 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -20,35 +20,29 @@ use std::sync::Arc;
2020

2121
use arrow::array::{RecordBatch, StringArray};
2222
use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
23-
use async_trait::async_trait;
2423

2524
use datafusion::assert_batches_eq;
26-
use datafusion::catalog::memory::DataSourceExec;
27-
use datafusion::catalog::{Session, TableProvider};
2825
use datafusion::common::tree_node::{
2926
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
3027
};
31-
use datafusion::common::{assert_contains, DFSchema, Result};
32-
use datafusion::datasource::listing::PartitionedFile;
33-
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
28+
use datafusion::common::{assert_contains, Result};
29+
use datafusion::datasource::listing::{
30+
ListingTable, ListingTableConfig, ListingTableUrl,
31+
};
3432
use datafusion::execution::context::SessionContext;
3533
use datafusion::execution::object_store::ObjectStoreUrl;
36-
use datafusion::logical_expr::utils::conjunction;
3734
use datafusion::logical_expr::{
38-
ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
39-
TableProviderFilterPushDown, TableType, Volatility,
35+
ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
4036
};
4137
use datafusion::parquet::arrow::ArrowWriter;
4238
use datafusion::parquet::file::properties::WriterProperties;
4339
use datafusion::physical_expr::PhysicalExpr;
4440
use datafusion::physical_expr::{expressions, ScalarFunctionExpr};
45-
use datafusion::physical_plan::ExecutionPlan;
46-
use datafusion::prelude::{lit, SessionConfig};
41+
use datafusion::prelude::SessionConfig;
4742
use datafusion::scalar::ScalarValue;
4843
use datafusion_physical_expr_adapter::{
4944
DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory,
5045
};
51-
use futures::StreamExt;
5246
use object_store::memory::InMemory;
5347
use object_store::path::Path;
5448
use object_store::{ObjectStore, PutPayload};
@@ -95,23 +89,29 @@ async fn main() -> Result<()> {
9589
let payload = PutPayload::from_bytes(buf.into());
9690
store.put(&path, payload).await?;
9791

98-
// Create a custom table provider that rewrites struct field access
99-
let table_provider = Arc::new(ExampleTableProvider::new(table_schema));
100-
10192
// Set up query execution
10293
let mut cfg = SessionConfig::new();
10394
cfg.options_mut().execution.parquet.pushdown_filters = true;
10495
let ctx = SessionContext::new_with_config(cfg);
105-
106-
// Register our table
107-
ctx.register_table("structs", table_provider)?;
108-
ctx.register_udf(ScalarUDF::new_from_impl(JsonGetStr::default()));
109-
11096
ctx.runtime_env().register_object_store(
11197
ObjectStoreUrl::parse("memory://")?.as_ref(),
11298
Arc::new(store),
11399
);
114100

101+
// Create a custom table provider that rewrites struct field access
102+
let listing_table_config =
103+
ListingTableConfig::new(ListingTableUrl::parse("memory:///example.parquet")?)
104+
.infer_options(&ctx.state())
105+
.await?
106+
.with_schema(table_schema)
107+
.with_expr_adapter_factory(Arc::new(ShreddedJsonRewriterFactory));
108+
let table = ListingTable::try_new(listing_table_config).unwrap();
109+
let table_provider = Arc::new(table);
110+
111+
// Register our table
112+
ctx.register_table("structs", table_provider)?;
113+
ctx.register_udf(ScalarUDF::new_from_impl(JsonGetStr::default()));
114+
115115
println!("\n=== Showing all data ===");
116116
let batches = ctx.sql("SELECT * FROM structs").await?.collect().await?;
117117
arrow::util::pretty::print_batches(&batches)?;
@@ -191,96 +191,6 @@ fn create_sample_record_batch(file_schema: &Schema) -> RecordBatch {
191191
.unwrap()
192192
}
193193

194-
/// Custom TableProvider that uses a StructFieldRewriter
195-
#[derive(Debug)]
196-
struct ExampleTableProvider {
197-
schema: SchemaRef,
198-
}
199-
200-
impl ExampleTableProvider {
201-
fn new(schema: SchemaRef) -> Self {
202-
Self { schema }
203-
}
204-
}
205-
206-
#[async_trait]
207-
impl TableProvider for ExampleTableProvider {
208-
fn as_any(&self) -> &dyn Any {
209-
self
210-
}
211-
212-
fn schema(&self) -> SchemaRef {
213-
self.schema.clone()
214-
}
215-
216-
fn table_type(&self) -> TableType {
217-
TableType::Base
218-
}
219-
220-
fn supports_filters_pushdown(
221-
&self,
222-
filters: &[&Expr],
223-
) -> Result<Vec<TableProviderFilterPushDown>> {
224-
// Implementers can choose to mark these filters as exact or inexact.
225-
// If marked as exact they cannot have false positives and must always be applied.
226-
// If marked as Inexact they can have false positives and at runtime the rewriter
227-
// can decide to not rewrite / ignore some filters since they will be re-evaluated upstream.
228-
// For the purposes of this example we mark them as Exact to demonstrate the rewriter is working and the filtering is not being re-evaluated upstream.
229-
Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
230-
}
231-
232-
async fn scan(
233-
&self,
234-
state: &dyn Session,
235-
projection: Option<&Vec<usize>>,
236-
filters: &[Expr],
237-
limit: Option<usize>,
238-
) -> Result<Arc<dyn ExecutionPlan>> {
239-
let schema = self.schema.clone();
240-
let df_schema = DFSchema::try_from(schema.clone())?;
241-
let filter = state.create_physical_expr(
242-
conjunction(filters.iter().cloned()).unwrap_or_else(|| lit(true)),
243-
&df_schema,
244-
)?;
245-
246-
let parquet_source = ParquetSource::default()
247-
.with_predicate(filter)
248-
.with_pushdown_filters(true);
249-
250-
let object_store_url = ObjectStoreUrl::parse("memory://")?;
251-
252-
let store = state.runtime_env().object_store(object_store_url)?;
253-
254-
let mut files = vec![];
255-
let mut listing = store.list(None);
256-
while let Some(file) = listing.next().await {
257-
if let Ok(file) = file {
258-
files.push(file);
259-
}
260-
}
261-
262-
let file_group = files
263-
.iter()
264-
.map(|file| PartitionedFile::new(file.location.clone(), file.size))
265-
.collect();
266-
267-
let file_scan_config = FileScanConfigBuilder::new(
268-
ObjectStoreUrl::parse("memory://")?,
269-
schema,
270-
Arc::new(parquet_source),
271-
)
272-
.with_projection(projection.cloned())
273-
.with_limit(limit)
274-
.with_file_group(file_group)
275-
// if the rewriter needs a reference to the table schema you can bind self.schema() here
276-
.with_expr_adapter(Some(Arc::new(ShreddedJsonRewriterFactory) as _));
277-
278-
Ok(Arc::new(DataSourceExec::new(Arc::new(
279-
file_scan_config.build(),
280-
))))
281-
}
282-
}
283-
284194
/// Scalar UDF that uses serde_json to access json fields
285195
#[derive(Debug, PartialEq, Eq, Hash)]
286196
pub struct JsonGetStr {

0 commit comments

Comments
 (0)