Skip to content

Commit 3dbc42c

Browse files
committed
fix getSchema
1 parent ee18306 commit 3dbc42c

File tree

24 files changed

+159
-291
lines changed

24 files changed

+159
-291
lines changed

aurora-mysql-plugin/src/test/java/io/cdap/plugin/aurora/mysql/AuroraMysqlSourceTestRun.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -329,9 +329,7 @@ public void testNonExistentDBTable() throws Exception {
329329
.addConnection(sourceBadName.getName(), sink.getName())
330330
.build();
331331
ApplicationId appId = NamespaceId.DEFAULT.app("dbSourceNonExistingTest");
332-
assertRuntimeFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT,
333-
"ETL Application with DB Source should have failed because of a " +
334-
"non-existent source table.", 1);
332+
assertDeployAppFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT);
335333

336334
// Bad connection
337335
ETLPlugin sourceBadConnConfig = new ETLPlugin(
@@ -357,8 +355,6 @@ public void testNonExistentDBTable() throws Exception {
357355
.addStage(sink)
358356
.addConnection(sourceBadConn.getName(), sink.getName())
359357
.build();
360-
assertRuntimeFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT,
361-
"ETL Application with DB Source should have failed because of a " +
362-
"non-existent source database.", 2);
358+
assertDeployAppFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT);
363359
}
364360
}

aurora-mysql-plugin/widgets/AuroraMysql-batchsource.json

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,32 +41,16 @@
4141
"label": "Database",
4242
"name": "database"
4343
},
44+
{
45+
"widget-type": "get-schema",
46+
"widget-category": "plugin"
47+
},
4448
{
4549
"widget-type": "textarea",
4650
"label": "Import Query",
4751
"name": "importQuery",
4852
"widget-attributes": {
4953
"rows": "4"
50-
},
51-
"plugin-function": {
52-
"widget": "outputSchema",
53-
"output-property": "schema",
54-
"omit-properties": [
55-
{
56-
"name": "schema"
57-
}
58-
],
59-
"add-properties": [
60-
{
61-
"name": "query",
62-
"plugin-property-for-value": "importQuery",
63-
"widget-type": "textarea",
64-
"label": "Query",
65-
"widget-attributes": {
66-
"rows": "4"
67-
}
68-
}
69-
]
7054
}
7155
},
7256
{

aurora-postgresql-plugin/src/test/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSourceTestRun.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -299,9 +299,7 @@ public void testNonExistentDBTable() throws Exception {
299299
.addConnection(sourceBadName.getName(), sink.getName())
300300
.build();
301301
ApplicationId appId = NamespaceId.DEFAULT.app("dbSourceNonExistingTest");
302-
assertRuntimeFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT,
303-
"ETL Application with DB Source should have failed because of a " +
304-
"non-existent source table.", 1);
302+
assertDeployAppFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT);
305303

306304
// Bad connection
307305
ETLPlugin sourceBadConnConfig = new ETLPlugin(
@@ -326,8 +324,6 @@ public void testNonExistentDBTable() throws Exception {
326324
.addStage(sink)
327325
.addConnection(sourceBadConn.getName(), sink.getName())
328326
.build();
329-
assertRuntimeFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT,
330-
"ETL Application with DB Source should have failed because of a " +
331-
"non-existent source database.", 2);
327+
assertDeployAppFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT);
332328
}
333329
}

aurora-postgresql-plugin/widgets/AuroraPostgres-batchsource.json

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,32 +41,16 @@
4141
"label": "Database",
4242
"name": "database"
4343
},
44+
{
45+
"widget-type": "get-schema",
46+
"widget-category": "plugin"
47+
},
4448
{
4549
"widget-type": "textarea",
4650
"label": "Import Query",
4751
"name": "importQuery",
4852
"widget-attributes": {
4953
"rows": "4"
50-
},
51-
"plugin-function": {
52-
"widget": "outputSchema",
53-
"output-property": "schema",
54-
"omit-properties": [
55-
{
56-
"name": "schema"
57-
}
58-
],
59-
"add-properties": [
60-
{
61-
"name": "query",
62-
"plugin-property-for-value": "importQuery",
63-
"widget-type": "textarea",
64-
"label": "Query",
65-
"widget-attributes": {
66-
"rows": "4"
67-
}
68-
}
69-
]
7054
}
7155
},
7256
{

database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ public abstract class AbstractDBSource extends ReferenceBatchSource<LongWritable
7272

7373
private static final Logger LOG = LoggerFactory.getLogger(AbstractDBSource.class);
7474
private static final SchemaTypeAdapter SCHEMA_TYPE_ADAPTER = new SchemaTypeAdapter();
75+
private static final Pattern CONDITIONS_AND = Pattern.compile("\\$conditions (and|or)\\s+",
76+
Pattern.CASE_INSENSITIVE);
77+
private static final Pattern AND_CONDITIONS = Pattern.compile("\\s+(and|or) \\$conditions",
78+
Pattern.CASE_INSENSITIVE);
79+
private static final Pattern WHERE_CONDITIONS = Pattern.compile("\\s+where \\$conditions",
80+
Pattern.CASE_INSENSITIVE);
7581

7682
protected final DBSourceConfig sourceConfig;
7783
protected Class<? extends Driver> driverClass;
@@ -81,20 +87,6 @@ public AbstractDBSource(DBSourceConfig sourceConfig) {
8187
this.sourceConfig = sourceConfig;
8288
}
8389

84-
private static String removeConditionsClause(String importQueryString) {
85-
importQueryString = importQueryString.replaceAll("\\s{2,}", " ");
86-
if (importQueryString.toUpperCase().contains("WHERE $CONDITIONS AND")) {
87-
importQueryString = importQueryString.replaceAll("(?i)" + Pattern.quote("$CONDITIONS AND"), "");
88-
} else if (importQueryString.toUpperCase().contains("WHERE $CONDITIONS")) {
89-
importQueryString = importQueryString.replaceAll("(?i)" + Pattern.quote("WHERE $CONDITIONS"), "");
90-
} else if (importQueryString.toUpperCase().contains("AND $CONDITIONS")) {
91-
importQueryString = importQueryString.replaceAll("(?i)" + Pattern.quote("AND $CONDITIONS"), "");
92-
} else if (importQueryString.toUpperCase().contains("$CONDITIONS")) {
93-
throw new IllegalArgumentException("Please remove the $CONDITIONS clause when fetching the input schema.");
94-
}
95-
return importQueryString;
96-
}
97-
9890
@Override
9991
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
10092
super.configurePipeline(pipelineConfigurer);
@@ -106,9 +98,9 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
10698
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
10799
FailureCollector collector = stageConfigurer.getFailureCollector();
108100
sourceConfig.validate(collector);
109-
if (!Strings.isNullOrEmpty(sourceConfig.schema)) {
101+
if (sourceConfig.getSchema() != null) {
110102
stageConfigurer.setOutputSchema(sourceConfig.getSchema());
111-
} else if (sourceConfig.query != null) {
103+
} else if (!sourceConfig.containsMacro(DBSourceConfig.IMPORT_QUERY)) {
112104
try {
113105
stageConfigurer.setOutputSchema(getSchema(driverClass));
114106
} catch (IllegalAccessException | InstantiationException e) {
@@ -131,7 +123,7 @@ public Schema getSchema(Class<? extends Driver> driverClass) throws IllegalAcces
131123
driverCleanup = loadPluginClassAndGetDriver(driverClass);
132124
try (Connection connection = getConnection()) {
133125
executeInitQueries(connection, sourceConfig.getInitQueries());
134-
String query = sourceConfig.query;
126+
String query = sourceConfig.importQuery;
135127
return loadSchemaFromDB(connection, query);
136128
} finally {
137129
driverCleanup.destroy();
@@ -152,6 +144,15 @@ private Schema loadSchemaFromDB(Connection connection, String query) throws SQLE
152144
return Schema.recordOf("outputSchema", getSchemaReader().getSchemaFields(resultSet));
153145
}
154146

147+
@VisibleForTesting
148+
static String removeConditionsClause(String importQueryString) {
149+
String query = importQueryString;
150+
query = CONDITIONS_AND.matcher(query).replaceAll("");
151+
query = AND_CONDITIONS.matcher(query).replaceAll("");
152+
query = WHERE_CONDITIONS.matcher(query).replaceAll("");
153+
return query;
154+
}
155+
155156
private Schema loadSchemaFromDB(Class<? extends Driver> driverClass)
156157
throws SQLException, IllegalAccessException, InstantiationException {
157158
String connectionString = sourceConfig.getConnectionString();
@@ -307,12 +308,6 @@ public abstract static class DBSourceConfig extends DBConfig {
307308
public static final String SCHEMA = "schema";
308309
public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel";
309310

310-
// this is a hidden property, only used to fetch schema
311-
@Nullable
312-
String query;
313-
314-
// only nullable for get schema button
315-
@Nullable
316311
@Name(IMPORT_QUERY)
317312
@Description("The SELECT query to use to import data from the specified table. " +
318313
"You can specify an arbitrary number of columns to import, or import all columns using *. " +
@@ -376,7 +371,11 @@ private void validate(FailureCollector collector) {
376371
TransactionIsolationLevel.validate(getTransactionIsolationLevel(), collector);
377372
}
378373

379-
if (!hasOneSplit && !containsMacro("importQuery") && !getImportQuery().contains("$CONDITIONS")) {
374+
if (!containsMacro(IMPORT_QUERY) && Strings.isNullOrEmpty(importQuery)) {
375+
collector.addFailure("Import Query must be specified.", null).withConfigProperty(IMPORT_QUERY);
376+
}
377+
378+
if (!hasOneSplit && !containsMacro(IMPORT_QUERY) && !getImportQuery().contains("$CONDITIONS")) {
380379
collector.addFailure("Invalid Import Query.",
381380
String.format("Import Query %s must contain the string '$CONDITIONS'.", importQuery))
382381
.withConfigProperty(IMPORT_QUERY);

database-commons/src/test/java/io/cdap/plugin/db/batch/DatabasePluginTestBase.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,17 @@ protected static void assertDeploymentFailure(ApplicationId appId, ETLBatchConfi
6969
}
7070
}
7171

72-
protected static void assertRuntimeFailure(ApplicationId appId, ETLBatchConfig etlConfig,
73-
ArtifactSummary datapipelineArtifact, String failureMessage, int runCount)
74-
throws Exception {
72+
protected static void assertDeployAppFailure(ApplicationId appId, ETLBatchConfig etlConfig,
73+
ArtifactSummary datapipelineArtifact) {
7574
AppRequest<ETLBatchConfig> appRequest = new AppRequest<>(datapipelineArtifact, etlConfig);
76-
ApplicationManager appManager = deployApplication(appId, appRequest);
77-
final WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);
78-
workflowManager.start();
79-
workflowManager.waitForRuns(ProgramRunStatus.FAILED, runCount, 3, TimeUnit.MINUTES);
75+
try {
76+
// this deploy application method will not throw appropriate exception, even it is 400, it will throw a
77+
// IllegalStateException, so just catch Exception here
78+
deployApplication(appId, appRequest);
79+
Assert.fail("Deploy app should fail");
80+
} catch (Exception e) {
81+
// expected
82+
}
8083
}
8184

8285
protected ApplicationManager deployETL(ETLPlugin sourcePlugin, ETLPlugin sinkPlugin,
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright © 2019 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.db.batch.source;
18+
19+
import org.junit.Assert;
20+
import org.junit.Test;
21+
22+
/**
23+
* Tests for {@link AbstractDBSource#removeConditionsClause(String)}
24+
*/
25+
public class ConditionsRemovalTest {
26+
27+
@Test
28+
public void testCasePreserved() {
29+
Assert.assertEquals("SeLecT * from mY_TAble",
30+
AbstractDBSource.removeConditionsClause("SeLecT * from mY_TAble where $CONDITIONS"));
31+
}
32+
33+
@Test
34+
public void testAndConditions() {
35+
Assert.assertEquals("select * from my_table where id > 3",
36+
AbstractDBSource.removeConditionsClause(
37+
"select * from my_table where id > 3 and $CONDITIONS"));
38+
}
39+
40+
@Test
41+
public void testConditionsAnd() {
42+
Assert.assertEquals("select * from my_table where id > 3",
43+
AbstractDBSource.removeConditionsClause(
44+
"select * from my_table where $CONDITIONS and id > 3"));
45+
}
46+
47+
@Test
48+
public void testConditionsInMiddleAnd() {
49+
Assert.assertEquals(
50+
"select * from my_table where id > 3 and id < 10",
51+
AbstractDBSource.removeConditionsClause(
52+
"select * from my_table where id > 3 and $CONDITIONS and id < 10"));
53+
}
54+
55+
@Test
56+
public void testConditionsInMiddleOr() {
57+
Assert.assertEquals(
58+
"select * from my_table where id > 3 or id < 10",
59+
AbstractDBSource.removeConditionsClause(
60+
"select * from my_table where id > 3 or $CONDITIONS or id < 10"));
61+
}
62+
}

db2-plugin/src/test/java/io/cdap/plugin/db2/Db2SourceTestRun.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -309,9 +309,7 @@ public void testNonExistentDBTable() throws Exception {
309309
.addConnection(sourceBadName.getName(), sink.getName())
310310
.build();
311311
ApplicationId appId = NamespaceId.DEFAULT.app("dbSourceNonExistingTest");
312-
assertRuntimeFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT,
313-
"ETL Application with DB Source should have failed because of a " +
314-
"non-existent source table.", 1);
312+
assertDeployAppFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT);
315313

316314
// Bad connection
317315
ETLPlugin sourceBadConnConfig = new ETLPlugin(
@@ -336,8 +334,6 @@ public void testNonExistentDBTable() throws Exception {
336334
.addStage(sink)
337335
.addConnection(sourceBadConn.getName(), sink.getName())
338336
.build();
339-
assertRuntimeFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT,
340-
"ETL Application with DB Source should have failed because of a " +
341-
"non-existent source database.", 2);
337+
assertDeployAppFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT);
342338
}
343339
}

db2-plugin/widgets/Db2-batchsource.json

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -44,32 +44,16 @@
4444
"label": "Database",
4545
"name": "database"
4646
},
47+
{
48+
"widget-type": "get-schema",
49+
"widget-category": "plugin"
50+
},
4751
{
4852
"widget-type": "textarea",
4953
"label": "Import Query",
5054
"name": "importQuery",
5155
"widget-attributes": {
5256
"rows": "4"
53-
},
54-
"plugin-function": {
55-
"widget": "outputSchema",
56-
"output-property": "schema",
57-
"omit-properties": [
58-
{
59-
"name": "schema"
60-
}
61-
],
62-
"add-properties": [
63-
{
64-
"name": "query",
65-
"plugin-property-for-value": "importQuery",
66-
"widget-type": "textarea",
67-
"label": "Query",
68-
"widget-attributes": {
69-
"rows": "4"
70-
}
71-
}
72-
]
7357
}
7458
},
7559
{

generic-database-plugin/src/test/java/io/cdap/plugin/db/batch/sink/DBSourceTestRun.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -358,9 +358,7 @@ public void testNonExistentDBTable() throws Exception {
358358
.addConnection(sourceBadName.getName(), sink.getName())
359359
.build();
360360
ApplicationId appId = NamespaceId.DEFAULT.app("dbSourceNonExistingTest");
361-
assertRuntimeFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT,
362-
"ETL Application with DB Source should have failed because of a " +
363-
"non-existent source table.", 1);
361+
assertDeployAppFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT);
364362

365363
// Bad connection
366364
String badConnection = String.format("jdbc:hsqldb:hsql://localhost/%sWRONG", getDatabase());
@@ -382,8 +380,6 @@ public void testNonExistentDBTable() throws Exception {
382380
.addStage(sink)
383381
.addConnection(sourceBadConn.getName(), sink.getName())
384382
.build();
385-
assertRuntimeFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT,
386-
"ETL Application with DB Source should have failed because of a " +
387-
"non-existent source database.", 2);
383+
assertDeployAppFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT);
388384
}
389385
}

0 commit comments

Comments
 (0)