Skip to content

Commit 39b14c0

Browse files
authored
Merge pull request #66 from data-integrations/multiple-fixes
Multiple fixes
2 parents ee18306 + f8bc02f commit 39b14c0

File tree

24 files changed

+166
-292
lines changed

24 files changed

+166
-292
lines changed

aurora-mysql-plugin/src/test/java/io/cdap/plugin/aurora/mysql/AuroraMysqlSourceTestRun.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -329,9 +329,7 @@ public void testNonExistentDBTable() throws Exception {
329329
.addConnection(sourceBadName.getName(), sink.getName())
330330
.build();
331331
ApplicationId appId = NamespaceId.DEFAULT.app("dbSourceNonExistingTest");
332-
assertRuntimeFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT,
333-
"ETL Application with DB Source should have failed because of a " +
334-
"non-existent source table.", 1);
332+
assertDeployAppFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT);
335333

336334
// Bad connection
337335
ETLPlugin sourceBadConnConfig = new ETLPlugin(
@@ -357,8 +355,6 @@ public void testNonExistentDBTable() throws Exception {
357355
.addStage(sink)
358356
.addConnection(sourceBadConn.getName(), sink.getName())
359357
.build();
360-
assertRuntimeFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT,
361-
"ETL Application with DB Source should have failed because of a " +
362-
"non-existent source database.", 2);
358+
assertDeployAppFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT);
363359
}
364360
}

aurora-mysql-plugin/widgets/AuroraMysql-batchsource.json

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,32 +41,16 @@
4141
"label": "Database",
4242
"name": "database"
4343
},
44+
{
45+
"widget-type": "get-schema",
46+
"widget-category": "plugin"
47+
},
4448
{
4549
"widget-type": "textarea",
4650
"label": "Import Query",
4751
"name": "importQuery",
4852
"widget-attributes": {
4953
"rows": "4"
50-
},
51-
"plugin-function": {
52-
"widget": "outputSchema",
53-
"output-property": "schema",
54-
"omit-properties": [
55-
{
56-
"name": "schema"
57-
}
58-
],
59-
"add-properties": [
60-
{
61-
"name": "query",
62-
"plugin-property-for-value": "importQuery",
63-
"widget-type": "textarea",
64-
"label": "Query",
65-
"widget-attributes": {
66-
"rows": "4"
67-
}
68-
}
69-
]
7054
}
7155
},
7256
{

aurora-postgresql-plugin/src/test/java/io/cdap/plugin/auroradb/postgres/AuroraPostgresSourceTestRun.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -299,9 +299,7 @@ public void testNonExistentDBTable() throws Exception {
299299
.addConnection(sourceBadName.getName(), sink.getName())
300300
.build();
301301
ApplicationId appId = NamespaceId.DEFAULT.app("dbSourceNonExistingTest");
302-
assertRuntimeFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT,
303-
"ETL Application with DB Source should have failed because of a " +
304-
"non-existent source table.", 1);
302+
assertDeployAppFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT);
305303

306304
// Bad connection
307305
ETLPlugin sourceBadConnConfig = new ETLPlugin(
@@ -326,8 +324,6 @@ public void testNonExistentDBTable() throws Exception {
326324
.addStage(sink)
327325
.addConnection(sourceBadConn.getName(), sink.getName())
328326
.build();
329-
assertRuntimeFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT,
330-
"ETL Application with DB Source should have failed because of a " +
331-
"non-existent source database.", 2);
327+
assertDeployAppFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT);
332328
}
333329
}

aurora-postgresql-plugin/widgets/AuroraPostgres-batchsource.json

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,32 +41,16 @@
4141
"label": "Database",
4242
"name": "database"
4343
},
44+
{
45+
"widget-type": "get-schema",
46+
"widget-category": "plugin"
47+
},
4448
{
4549
"widget-type": "textarea",
4650
"label": "Import Query",
4751
"name": "importQuery",
4852
"widget-attributes": {
4953
"rows": "4"
50-
},
51-
"plugin-function": {
52-
"widget": "outputSchema",
53-
"output-property": "schema",
54-
"omit-properties": [
55-
{
56-
"name": "schema"
57-
}
58-
],
59-
"add-properties": [
60-
{
61-
"name": "query",
62-
"plugin-property-for-value": "importQuery",
63-
"widget-type": "textarea",
64-
"label": "Query",
65-
"widget-attributes": {
66-
"rows": "4"
67-
}
68-
}
69-
]
7054
}
7155
},
7256
{

database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import java.util.List;
6464
import java.util.Properties;
6565
import java.util.regex.Pattern;
66+
import java.util.stream.Collectors;
6667
import javax.annotation.Nullable;
6768

6869
/**
@@ -72,6 +73,12 @@ public abstract class AbstractDBSource extends ReferenceBatchSource<LongWritable
7273

7374
private static final Logger LOG = LoggerFactory.getLogger(AbstractDBSource.class);
7475
private static final SchemaTypeAdapter SCHEMA_TYPE_ADAPTER = new SchemaTypeAdapter();
76+
private static final Pattern CONDITIONS_AND = Pattern.compile("\\$conditions (and|or)\\s+",
77+
Pattern.CASE_INSENSITIVE);
78+
private static final Pattern AND_CONDITIONS = Pattern.compile("\\s+(and|or) \\$conditions",
79+
Pattern.CASE_INSENSITIVE);
80+
private static final Pattern WHERE_CONDITIONS = Pattern.compile("\\s+where \\$conditions",
81+
Pattern.CASE_INSENSITIVE);
7582

7683
protected final DBSourceConfig sourceConfig;
7784
protected Class<? extends Driver> driverClass;
@@ -81,20 +88,6 @@ public AbstractDBSource(DBSourceConfig sourceConfig) {
8188
this.sourceConfig = sourceConfig;
8289
}
8390

84-
private static String removeConditionsClause(String importQueryString) {
85-
importQueryString = importQueryString.replaceAll("\\s{2,}", " ");
86-
if (importQueryString.toUpperCase().contains("WHERE $CONDITIONS AND")) {
87-
importQueryString = importQueryString.replaceAll("(?i)" + Pattern.quote("$CONDITIONS AND"), "");
88-
} else if (importQueryString.toUpperCase().contains("WHERE $CONDITIONS")) {
89-
importQueryString = importQueryString.replaceAll("(?i)" + Pattern.quote("WHERE $CONDITIONS"), "");
90-
} else if (importQueryString.toUpperCase().contains("AND $CONDITIONS")) {
91-
importQueryString = importQueryString.replaceAll("(?i)" + Pattern.quote("AND $CONDITIONS"), "");
92-
} else if (importQueryString.toUpperCase().contains("$CONDITIONS")) {
93-
throw new IllegalArgumentException("Please remove the $CONDITIONS clause when fetching the input schema.");
94-
}
95-
return importQueryString;
96-
}
97-
9891
@Override
9992
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
10093
super.configurePipeline(pipelineConfigurer);
@@ -106,9 +99,9 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
10699
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
107100
FailureCollector collector = stageConfigurer.getFailureCollector();
108101
sourceConfig.validate(collector);
109-
if (!Strings.isNullOrEmpty(sourceConfig.schema)) {
102+
if (sourceConfig.getSchema() != null) {
110103
stageConfigurer.setOutputSchema(sourceConfig.getSchema());
111-
} else if (sourceConfig.query != null) {
104+
} else if (!sourceConfig.containsMacro(DBSourceConfig.IMPORT_QUERY)) {
112105
try {
113106
stageConfigurer.setOutputSchema(getSchema(driverClass));
114107
} catch (IllegalAccessException | InstantiationException e) {
@@ -131,7 +124,7 @@ public Schema getSchema(Class<? extends Driver> driverClass) throws IllegalAcces
131124
driverCleanup = loadPluginClassAndGetDriver(driverClass);
132125
try (Connection connection = getConnection()) {
133126
executeInitQueries(connection, sourceConfig.getInitQueries());
134-
String query = sourceConfig.query;
127+
String query = sourceConfig.importQuery;
135128
return loadSchemaFromDB(connection, query);
136129
} finally {
137130
driverCleanup.destroy();
@@ -152,6 +145,15 @@ private Schema loadSchemaFromDB(Connection connection, String query) throws SQLE
152145
return Schema.recordOf("outputSchema", getSchemaReader().getSchemaFields(resultSet));
153146
}
154147

148+
@VisibleForTesting
149+
static String removeConditionsClause(String importQueryString) {
150+
String query = importQueryString;
151+
query = CONDITIONS_AND.matcher(query).replaceAll("");
152+
query = AND_CONDITIONS.matcher(query).replaceAll("");
153+
query = WHERE_CONDITIONS.matcher(query).replaceAll("");
154+
return query;
155+
}
156+
155157
private Schema loadSchemaFromDB(Class<? extends Driver> driverClass)
156158
throws SQLException, IllegalAccessException, InstantiationException {
157159
String connectionString = sourceConfig.getConnectionString();
@@ -265,7 +267,12 @@ public void prepareRun(BatchSourceContext context) throws Exception {
265267
connectionConfigAccessor.setSchema(schemaStr);
266268
}
267269
LineageRecorder lineageRecorder = new LineageRecorder(context, sourceConfig.referenceName);
268-
lineageRecorder.createExternalDataset(sourceConfig.getSchema());
270+
Schema schema = sourceConfig.getSchema() == null ? schemaFromDB : sourceConfig.getSchema();
271+
lineageRecorder.createExternalDataset(schema);
272+
if (schema != null && schema.getFields() != null) {
273+
lineageRecorder.recordRead("Read", "Read from database plugin",
274+
schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()));
275+
}
269276
context.setInput(Input.of(sourceConfig.referenceName, new SourceInputFormatProvider(
270277
DataDrivenETLDBInputFormat.class, connectionConfigAccessor.getConfiguration())));
271278
}
@@ -307,12 +314,6 @@ public abstract static class DBSourceConfig extends DBConfig {
307314
public static final String SCHEMA = "schema";
308315
public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel";
309316

310-
// this is a hidden property, only used to fetch schema
311-
@Nullable
312-
String query;
313-
314-
// only nullable for get schema button
315-
@Nullable
316317
@Name(IMPORT_QUERY)
317318
@Description("The SELECT query to use to import data from the specified table. " +
318319
"You can specify an arbitrary number of columns to import, or import all columns using *. " +
@@ -376,7 +377,11 @@ private void validate(FailureCollector collector) {
376377
TransactionIsolationLevel.validate(getTransactionIsolationLevel(), collector);
377378
}
378379

379-
if (!hasOneSplit && !containsMacro("importQuery") && !getImportQuery().contains("$CONDITIONS")) {
380+
if (!containsMacro(IMPORT_QUERY) && Strings.isNullOrEmpty(importQuery)) {
381+
collector.addFailure("Import Query must be specified.", null).withConfigProperty(IMPORT_QUERY);
382+
}
383+
384+
if (!hasOneSplit && !containsMacro(IMPORT_QUERY) && !getImportQuery().contains("$CONDITIONS")) {
380385
collector.addFailure("Invalid Import Query.",
381386
String.format("Import Query %s must contain the string '$CONDITIONS'.", importQuery))
382387
.withConfigProperty(IMPORT_QUERY);

database-commons/src/test/java/io/cdap/plugin/db/batch/DatabasePluginTestBase.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,17 @@ protected static void assertDeploymentFailure(ApplicationId appId, ETLBatchConfi
6969
}
7070
}
7171

72-
protected static void assertRuntimeFailure(ApplicationId appId, ETLBatchConfig etlConfig,
73-
ArtifactSummary datapipelineArtifact, String failureMessage, int runCount)
74-
throws Exception {
72+
protected static void assertDeployAppFailure(ApplicationId appId, ETLBatchConfig etlConfig,
73+
ArtifactSummary datapipelineArtifact) {
7574
AppRequest<ETLBatchConfig> appRequest = new AppRequest<>(datapipelineArtifact, etlConfig);
76-
ApplicationManager appManager = deployApplication(appId, appRequest);
77-
final WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);
78-
workflowManager.start();
79-
workflowManager.waitForRuns(ProgramRunStatus.FAILED, runCount, 3, TimeUnit.MINUTES);
75+
try {
76+
// this deploy application method will not throw appropriate exception, even it is 400, it will throw a
77+
// IllegalStateException, so just catch Exception here
78+
deployApplication(appId, appRequest);
79+
Assert.fail("Deploy app should fail");
80+
} catch (Exception e) {
81+
// expected
82+
}
8083
}
8184

8285
protected ApplicationManager deployETL(ETLPlugin sourcePlugin, ETLPlugin sinkPlugin,
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright © 2019 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.db.batch.source;
18+
19+
import org.junit.Assert;
20+
import org.junit.Test;
21+
22+
/**
23+
* Tests for {@link AbstractDBSource#removeConditionsClause(String)}
24+
*/
25+
public class ConditionsRemovalTest {
26+
27+
@Test
28+
public void testCasePreserved() {
29+
Assert.assertEquals("SeLecT * from mY_TAble",
30+
AbstractDBSource.removeConditionsClause("SeLecT * from mY_TAble where $CONDITIONS"));
31+
}
32+
33+
@Test
34+
public void testAndConditions() {
35+
Assert.assertEquals("select * from my_table where id > 3",
36+
AbstractDBSource.removeConditionsClause(
37+
"select * from my_table where id > 3 and $CONDITIONS"));
38+
}
39+
40+
@Test
41+
public void testConditionsAnd() {
42+
Assert.assertEquals("select * from my_table where id > 3",
43+
AbstractDBSource.removeConditionsClause(
44+
"select * from my_table where $CONDITIONS and id > 3"));
45+
}
46+
47+
@Test
48+
public void testConditionsInMiddleAnd() {
49+
Assert.assertEquals(
50+
"select * from my_table where id > 3 and id < 10",
51+
AbstractDBSource.removeConditionsClause(
52+
"select * from my_table where id > 3 and $CONDITIONS and id < 10"));
53+
}
54+
55+
@Test
56+
public void testConditionsInMiddleOr() {
57+
Assert.assertEquals(
58+
"select * from my_table where id > 3 or id < 10",
59+
AbstractDBSource.removeConditionsClause(
60+
"select * from my_table where id > 3 or $CONDITIONS or id < 10"));
61+
}
62+
}

db2-plugin/src/test/java/io/cdap/plugin/db2/Db2SourceTestRun.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -309,9 +309,7 @@ public void testNonExistentDBTable() throws Exception {
309309
.addConnection(sourceBadName.getName(), sink.getName())
310310
.build();
311311
ApplicationId appId = NamespaceId.DEFAULT.app("dbSourceNonExistingTest");
312-
assertRuntimeFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT,
313-
"ETL Application with DB Source should have failed because of a " +
314-
"non-existent source table.", 1);
312+
assertDeployAppFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT);
315313

316314
// Bad connection
317315
ETLPlugin sourceBadConnConfig = new ETLPlugin(
@@ -336,8 +334,6 @@ public void testNonExistentDBTable() throws Exception {
336334
.addStage(sink)
337335
.addConnection(sourceBadConn.getName(), sink.getName())
338336
.build();
339-
assertRuntimeFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT,
340-
"ETL Application with DB Source should have failed because of a " +
341-
"non-existent source database.", 2);
337+
assertDeployAppFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT);
342338
}
343339
}

db2-plugin/widgets/Db2-batchsource.json

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -44,32 +44,16 @@
4444
"label": "Database",
4545
"name": "database"
4646
},
47+
{
48+
"widget-type": "get-schema",
49+
"widget-category": "plugin"
50+
},
4751
{
4852
"widget-type": "textarea",
4953
"label": "Import Query",
5054
"name": "importQuery",
5155
"widget-attributes": {
5256
"rows": "4"
53-
},
54-
"plugin-function": {
55-
"widget": "outputSchema",
56-
"output-property": "schema",
57-
"omit-properties": [
58-
{
59-
"name": "schema"
60-
}
61-
],
62-
"add-properties": [
63-
{
64-
"name": "query",
65-
"plugin-property-for-value": "importQuery",
66-
"widget-type": "textarea",
67-
"label": "Query",
68-
"widget-attributes": {
69-
"rows": "4"
70-
}
71-
}
72-
]
7357
}
7458
},
7559
{

0 commit comments

Comments
 (0)