Skip to content

Commit ac4466d

Browse files
authored
[Feature/datafusion] Add SingleNodeTestCase with clickbench queries (#19953)
* Handle empty flush and ignore mapped types not defined in vsr. Signed-off-by: Marc Handalian <[email protected]> * add setup to run clickbench tests as SingleNodeTestCases Signed-off-by: Marc Handalian <[email protected]> * fix test name to match convention Signed-off-by: Marc Handalian <[email protected]> * fix resource loading when executing DataFusionSingleNodeTests outside of intellij Signed-off-by: Marc Handalian <[email protected]> * bump parquet version to match data format plugin Signed-off-by: Marc Handalian <[email protected]> * Add 41 clickbench queries as DSL - fix 25 to not blow up the cluster Signed-off-by: Marc Handalian <[email protected]> * fix build Signed-off-by: Marc Handalian <[email protected]> --------- Signed-off-by: Marc Handalian <[email protected]>
1 parent 31308ab commit ac4466d

File tree

50 files changed

+492
-6
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+492
-6
lines changed

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/fields/ParquetField.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,11 @@ public final void createField(final MappedFieldType mappedFieldType,
7575
Objects.requireNonNull(managedVSR, "ManagedVSR cannot be null");
7676

7777
if (mappedFieldType.isColumnar()) {
78-
addToGroup(mappedFieldType, managedVSR, parseValue);
78+
// TODO: support dynamic mapping update
79+
// for now ignore the field
80+
if (managedVSR.getVector(mappedFieldType.name()) != null) {
81+
addToGroup(mappedFieldType, managedVSR, parseValue);
82+
}
7983
}
8084
}
8185

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/writer/ParquetWriter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ public WriteResult addDoc(ParquetDocumentInput d) throws IOException {
5454
public FileInfos flush(FlushIn flushIn) throws IOException {
5555
String fileName = vsrManager.flush(flushIn);
5656
FileInfos fileInfos = new FileInfos();
57+
// no data flushed
58+
if (fileName == null) {
59+
return fileInfos;
60+
}
5761
WriterFileSet writerFileSet = new WriterFileSet(Path.of(fileName).getParent(), writerGeneration);
5862
writerFileSet.add(fileName);
5963
fileInfos.putWriterFileSet(PARQUET_DATA_FORMAT, writerFileSet);

plugins/engine-datafusion/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ thiserror = "1.0"
4040
# Logging
4141
log = "0.4"
4242
# Parquet support
43-
parquet = "53.0.0"
43+
parquet = "54.0.0"
4444

4545
# Object store for file access
4646
object_store = "=0.12.3"

plugins/engine-datafusion/jni/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,8 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute
385385
};
386386

387387
let dataframe = ctx.execute_logical_plan(logical_plan).await.expect("Failed to execute logical plan");
388+
let physical_plan = dataframe.clone().create_physical_plan().await.unwrap();
389+
println!("Physical Plan:\n{}", datafusion::physical_plan::displayable(physical_plan.as_ref()).indent(true));
388390

389391
let stream = match dataframe.execute_stream().await {
390392
Ok(stream) => { stream }

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionQueryJNI.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88

99
package org.opensearch.datafusion;
1010

11-
import org.opensearch.datafusion.search.QueryCallback;
12-
1311
/**
1412
* JNI wrapper for DataFusion operations
1513
*/
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.datafusion;
10+
11+
import com.parquet.parquetdataformat.ParquetDataFormatPlugin;
12+
import org.opensearch.action.search.SearchResponse;
13+
import org.opensearch.cluster.metadata.IndexMetadata;
14+
import org.opensearch.common.settings.Settings;
15+
import org.opensearch.common.xcontent.json.JsonXContent;
16+
import org.opensearch.core.xcontent.MediaTypeRegistry;
17+
import org.opensearch.core.xcontent.XContentParser;
18+
import org.opensearch.plugins.Plugin;
19+
import org.opensearch.search.builder.SearchSourceBuilder;
20+
import org.opensearch.test.OpenSearchIntegTestCase;
21+
import org.opensearch.test.OpenSearchSingleNodeTestCase;
22+
23+
import java.io.BufferedReader;
24+
import java.io.FileInputStream;
25+
import java.io.IOException;
26+
import java.io.InputStream;
27+
import java.io.InputStreamReader;
28+
import java.io.Reader;
29+
import java.nio.charset.StandardCharsets;
30+
import java.util.Collection;
31+
import java.util.List;
32+
import java.util.Locale;
33+
34+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
35+
public class DataFusionSingleNodeTests extends OpenSearchSingleNodeTestCase {
36+
37+
private static final String INDEX_MAPPING_JSON = "clickbench_index_mapping.json";
38+
private static final String DATA = "clickbench.json";
39+
private final String indexName = "hits";
40+
41+
@Override
42+
protected Collection<Class<? extends Plugin>> getPlugins() {
43+
return List.of(DataFusionPlugin.class, ParquetDataFormatPlugin.class);
44+
}
45+
46+
public void testClickBenchQueries() throws IOException {
47+
String mappings = fileToString(
48+
INDEX_MAPPING_JSON,
49+
false
50+
);
51+
createIndexWithMappingSource(
52+
indexName,
53+
Settings.builder()
54+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
55+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
56+
.put("index.refresh_interval", -1)
57+
.build(),
58+
mappings
59+
);
60+
String req = fileToString(
61+
DATA,
62+
false
63+
);
64+
System.out.println(req.trim());
65+
client().prepareIndex("hits").setSource(req, MediaTypeRegistry.JSON).get();
66+
client().admin().indices().prepareRefresh().get();
67+
client().admin().indices().prepareFlush().get();
68+
client().admin().indices().prepareFlush().get();
69+
70+
// TODO: run in a loop
71+
String sourceFile = fileToString(
72+
"q25.json",
73+
false
74+
);
75+
SearchSourceBuilder source = new SearchSourceBuilder();
76+
XContentParser parser = createParser(JsonXContent.jsonXContent,
77+
sourceFile);
78+
source.parseXContent(parser);
79+
SearchResponse response = client().prepareSearch(indexName).setSource(source).get();
80+
// TODO: Match expected results...
81+
System.out.println(response);
82+
}
83+
84+
static String getResourceFilePath(String relPath) {
85+
return DataFusionSingleNodeTests.class.getClassLoader().getResource(relPath).getPath();
86+
}
87+
88+
static String fileToString(
89+
final String filePathFromProjectRoot, final boolean removeNewLines) throws IOException {
90+
91+
final String absolutePath = getResourceFilePath(filePathFromProjectRoot);
92+
93+
try (final InputStream stream = new FileInputStream(absolutePath);
94+
final Reader streamReader = new InputStreamReader(stream, StandardCharsets.UTF_8);
95+
final BufferedReader br = new BufferedReader(streamReader)) {
96+
97+
final StringBuilder stringBuilder = new StringBuilder();
98+
String line = br.readLine();
99+
100+
while (line != null) {
101+
102+
stringBuilder.append(line);
103+
if (!removeNewLines) {
104+
stringBuilder.append(String.format(Locale.ROOT, "%n"));
105+
}
106+
line = br.readLine();
107+
}
108+
109+
return stringBuilder.toString();
110+
}
111+
}
112+
113+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"WatchID":"9110818468285196899","JavaEnable":0,"Title":"","GoodEvent":1,"EventTime":"2013-07-14 20:38:47","EventDate":"2013-07-15","CounterID":17,"ClientIP":-1216690514,"RegionID":839,"UserID":"-2461439046089301801","CounterClass":0,"OS":0,"UserAgent":0,"URL":"","Referer":"https://example.org/about","IsRefresh":0,"RefererCategoryID":0,"RefererRegionID":0,"URLCategoryID":0,"URLRegionID":0,"ResolutionWidth":0,"ResolutionHeight":0,"ResolutionDepth":0,"FlashMajor":0,"FlashMinor":0,"FlashMinor2":"","NetMajor":0,"NetMinor":0,"UserAgentMajor":0,"UserAgentMinor":"�O","CookieEnable":0,"JavascriptEnable":0,"IsMobile":0,"MobilePhone":0,"MobilePhoneModel":"","Params":"","IPNetworkID":3793327,"TraficSourceID":4,"SearchEngineID":0,"SearchPhrase":"ha","AdvEngineID":0,"IsArtifical":0,"WindowClientWidth":0,"WindowClientHeight":0,"ClientTimeZone":-1,"ClientEventTime":"1971-01-01 14:16:06","SilverlightVersion1":0,"SilverlightVersion2":0,"SilverlightVersion3":0,"SilverlightVersion4":0,"PageCharset":"","CodeVersion":0,"IsLink":0,"IsDownload":0,"IsNotBounce":0,"FUniqID":"0","OriginalURL":"","HID":0,"IsOldCounter":0,"IsEvent":0,"IsParameter":0,"DontCountHits":0,"WithHash":0,"HitColor":"5","LocalEventTime":"2013-07-15 10:47:34","Age":0,"Sex":0,"Income":0,"Interests":0,"Robotness":0,"RemoteIP":-1001831330,"WindowName":-1,"OpenerName":-1,"HistoryLength":-1,"BrowserLanguage":"�","BrowserCountry":"�\f","SocialNetwork":"","SocialAction":"","HTTPError":0,"SendTiming":0,"DNSTiming":0,"ConnectTiming":0,"ResponseStartTiming":0,"ResponseEndTiming":0,"FetchTiming":0,"SocialSourceNetworkID":0,"SocialSourcePage":"","ParamPrice":"0","ParamOrderID":"","ParamCurrency":"NH\u001C","ParamCurrencyID":0,"OpenstatServiceName":"","OpenstatCampaignID":"","OpenstatAdID":"","OpenstatSourceID":"","UTMSource":"","UTMMedium":"","UTMCampaign":"","UTMContent":"","UTMTerm":"","FromTag":"","HasGCLID":0,"RefererHash":"-296158784638538920","URLHash":"-8417682003818480435","CLID":0}

0 commit comments

Comments
 (0)