Skip to content

Commit 467a8cd

Browse files
adding enum for importqueryType and using it for validation.
1 parent 2cdd7b1 commit 467a8cd

File tree

5 files changed

+158
-100
lines changed

5 files changed

+158
-100
lines changed

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java

Lines changed: 49 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,15 @@
3737
import io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig;
3838
import io.cdap.plugin.db.source.AbstractDBSource;
3939
import io.cdap.plugin.util.DBUtils;
40+
import io.cdap.plugin.util.ImportQueryType;
4041
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
4142

4243
import java.util.Collections;
4344
import java.util.Map;
4445
import javax.annotation.Nullable;
4546

4647
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.IMPORT_QUERY;
48+
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.PROPERTY_IMPORT_QUERY_TYPE;
4749
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.TABLE_NAME;
4850

4951
/**
@@ -75,29 +77,31 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
7577
}
7678
return;
7779
}
78-
String importQueryType = sourceConfig.getImportQueryType();
79-
boolean isTable = TABLE_NAME.equalsIgnoreCase(importQueryType);
80-
boolean isImportQuery = IMPORT_QUERY.equalsIgnoreCase(importQueryType);
81-
82-
if (isTable) {
83-
if (Strings.isNullOrEmpty(sourceConfig.getTableName())) {
84-
collector.addFailure(
85-
"'tableName' must be specified when importQueryType is 'table'.",
86-
"Provide a value for 'tableName'."
87-
).withConfigProperty(TABLE_NAME);
88-
}
89-
} else if (isImportQuery) {
90-
if (Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
91-
collector.addFailure(
92-
"'importQuery' must be specified when importQueryType is 'importQuery'.",
93-
"Provide a value for 'importQuery'."
94-
).withConfigProperty(IMPORT_QUERY);
80+
if (!sourceConfig.containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) {
81+
boolean isImportQuerySelected = ImportQueryType.IMPORT_QUERY.getValue().equals(sourceConfig.importQueryType);
82+
boolean isTableNameSelected = ImportQueryType.TABLE_NAME.getValue().equals(sourceConfig.importQueryType);
83+
84+
if (isImportQuerySelected && !sourceConfig.containsMacro(IMPORT_QUERY) &&
85+
Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
86+
collector.addFailure("Import Query cannot be empty", null)
87+
.withConfigProperty(IMPORT_QUERY);
88+
89+
} else if (isTableNameSelected && !sourceConfig.containsMacro(TABLE_NAME) &&
90+
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
91+
collector.addFailure("Table Name cannot be empty", null)
92+
.withConfigProperty(TABLE_NAME);
9593
}
9694
} else {
97-
collector.addFailure(
98-
"Both import query and table name are empty'.",
99-
"Either import query or tableName should be given. Both can not be null together."
100-
);
95+
boolean isImportQueryMissing = !sourceConfig.containsMacro(IMPORT_QUERY) &&
96+
Strings.isNullOrEmpty(sourceConfig.getImportQuery());
97+
boolean isTableNameMissing = !sourceConfig.containsMacro(TABLE_NAME) &&
98+
Strings.isNullOrEmpty(sourceConfig.getTableName());
99+
100+
if (isImportQueryMissing && isTableNameMissing) {
101+
collector.addFailure("Either 'Import Query' or 'Table Name' must be provided.", null)
102+
.withConfigProperty(IMPORT_QUERY)
103+
.withConfigProperty(TABLE_NAME);
104+
}
101105
}
102106
collector.getOrThrowException();
103107
super.configurePipeline(pipelineConfigurer);
@@ -106,31 +110,33 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
106110
@Override
107111
public void prepareRun(BatchSourceContext context) throws Exception {
108112
FailureCollector collector = context.getFailureCollector();
109-
String importQueryType = sourceConfig.getImportQueryType();
110-
boolean isTable = TABLE_NAME.equalsIgnoreCase(importQueryType);
111-
boolean isImportQuery = IMPORT_QUERY.equalsIgnoreCase(importQueryType);
112-
113-
if (isTable) {
114-
if (Strings.isNullOrEmpty(sourceConfig.getTableName())) {
115-
collector.addFailure(
116-
"'tableName' must be specified when importQueryType is 'table'.",
117-
"Provide a value for 'tableName'."
118-
).withConfigProperty(TABLE_NAME);
119-
}
120-
} else if (isImportQuery) {
121-
if (Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
122-
collector.addFailure(
123-
"'importQuery' must be specified when importQueryType is 'importQuery'.",
124-
"Provide a value for 'importQuery'."
125-
).withConfigProperty(IMPORT_QUERY);
113+
114+
if (!sourceConfig.containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) {
115+
boolean isImportQuerySelected = ImportQueryType.IMPORT_QUERY.getValue().equals(sourceConfig.importQueryType);
116+
boolean isTableNameSelected = ImportQueryType.TABLE_NAME.getValue().equals(sourceConfig.importQueryType);
117+
118+
if (isImportQuerySelected && !sourceConfig.containsMacro(IMPORT_QUERY) &&
119+
Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
120+
collector.addFailure("Import Query cannot be empty", null)
121+
.withConfigProperty(IMPORT_QUERY);
122+
123+
} else if (isTableNameSelected && !sourceConfig.containsMacro(TABLE_NAME) &&
124+
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
125+
collector.addFailure("Table Name cannot be empty", null)
126+
.withConfigProperty(TABLE_NAME);
126127
}
127128
} else {
128-
collector.addFailure(
129-
"Both import query and table name are empty'.",
130-
"Either import query or tableName should be given. Both can not be null together."
131-
);
129+
boolean isImportQueryMissing = !sourceConfig.containsMacro(IMPORT_QUERY) &&
130+
Strings.isNullOrEmpty(sourceConfig.getImportQuery());
131+
boolean isTableNameMissing = !sourceConfig.containsMacro(TABLE_NAME) &&
132+
Strings.isNullOrEmpty(sourceConfig.getTableName());
133+
134+
if (isImportQueryMissing && isTableNameMissing) {
135+
collector.addFailure("Either 'Import Query' or 'Table Name' must be provided.", null)
136+
.withConfigProperty(IMPORT_QUERY)
137+
.withConfigProperty(TABLE_NAME);
138+
}
132139
}
133-
134140
collector.getOrThrowException();
135141
super.prepareRun(context);
136142
}

database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.cdap.plugin.db.TransactionIsolationLevel;
2929
import io.cdap.plugin.db.connector.AbstractDBConnectorConfig;
3030
import io.cdap.plugin.db.source.AbstractDBSource;
31+
import io.cdap.plugin.util.ImportQueryType;
3132

3233
import java.io.IOException;
3334
import java.util.Collections;
@@ -55,6 +56,7 @@ public abstract class AbstractDBSpecificSourceConfig extends PluginConfig implem
5556
@Description(Constants.Reference.REFERENCE_NAME_DESCRIPTION)
5657
public String referenceName;
5758

59+
@Nullable
5860
@Name(PROPERTY_IMPORT_QUERY_TYPE)
5961
@Description("Whether to select Table Name or Import Query to extract the data.")
6062
public String importQueryType;
@@ -119,14 +121,9 @@ public String getTableName() {
119121
return tableName;
120122
}
121123

124+
@Nullable
122125
public String getImportQueryType() {
123-
if (!Strings.isNullOrEmpty(tableName)) {
124-
return TABLE_NAME;
125-
} else if (!Strings.isNullOrEmpty(importQuery)) {
126-
return IMPORT_QUERY;
127-
} else {
128-
return null;
129-
}
126+
return importQueryType == null ? ImportQueryType.IMPORT_QUERY.name() : importQueryType;
130127
}
131128

132129
public String getBoundingQuery() {

database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@
4949
import io.cdap.plugin.db.DBRecord;
5050
import io.cdap.plugin.db.SchemaReader;
5151
import io.cdap.plugin.db.TransactionIsolationLevel;
52-
import io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig;
5352
import io.cdap.plugin.db.config.DatabaseSourceConfig;
5453
import io.cdap.plugin.util.DBUtils;
5554
import io.cdap.plugin.util.DriverCleanup;
55+
import io.cdap.plugin.util.ImportQueryType;
5656
import org.apache.hadoop.io.LongWritable;
5757
import org.apache.hadoop.mapreduce.MRJobConfig;
5858
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
@@ -165,9 +165,10 @@ public Schema getSchema() throws SQLException {
165165
try (Connection connection = getConnection()) {
166166
executeInitQueries(connection, sourceConfig.getInitQueries());
167167
String query = sourceConfig.getImportQuery();
168-
if (AbstractDBSpecificSourceConfig.IMPORT_QUERY.equalsIgnoreCase(sourceConfig.getImportQueryType())) {
168+
ImportQueryType type = ImportQueryType.fromString(sourceConfig.getImportQueryType());
169+
if (type == ImportQueryType.IMPORT_QUERY) {
169170
return loadSchemaFromDBwithQuery(connection, query);
170-
} else if (AbstractDBSpecificSourceConfig.TABLE_NAME.equalsIgnoreCase(sourceConfig.getImportQueryType())) {
171+
} else if (type == ImportQueryType.TABLE_NAME) {
171172
List<Schema.Field> fields = getSchemaReader().getSchemaFields(connection, sourceConfig.getTableName());
172173
return Schema.recordOf("schema", fields);
173174
} else {
@@ -211,10 +212,11 @@ private Schema loadSchemaFromDB(Class<? extends Driver> driverClass)
211212
executeInitQueries(connection, sourceConfig.getInitQueries());
212213
String importQuery = sourceConfig.getImportQuery();
213214
String tableName = sourceConfig.getTableName();
214-
if (AbstractDBSpecificSourceConfig.IMPORT_QUERY.equalsIgnoreCase(sourceConfig.getImportQueryType())) {
215-
return loadSchemaFromDBwithQuery(connection, importQuery);
216-
} else {
215+
ImportQueryType type = ImportQueryType.fromString(sourceConfig.getImportQueryType());
216+
if (type == ImportQueryType.TABLE_NAME) {
217217
return loadSchemaFromDBwithTableName(connection, tableName);
218+
} else {
219+
return loadSchemaFromDBwithQuery(connection, importQuery);
218220
}
219221
} catch (SQLException e) {
220222
// wrap exception to ensure SQLException-child instances not exposed to contexts without jdbc driver in classpath
@@ -352,9 +354,9 @@ public ConnectionConfigAccessor getConnectionConfigAccessor (String driverClassN
352354
if (sourceConfig.getFetchSize() != null) {
353355
connectionConfigAccessor.setFetchSize(sourceConfig.getFetchSize());
354356
}
355-
357+
ImportQueryType type = ImportQueryType.fromString(sourceConfig.getImportQueryType());
356358
String query;
357-
if (!Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
359+
if (type == ImportQueryType.IMPORT_QUERY) {
358360
query = sourceConfig.getImportQuery();
359361
} else {
360362
query = String.format("SELECT * FROM %s", sourceConfig.getTableName());
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.util;
18+
19+
/**
20+
* Enum to specify the import query type used.
21+
*/
22+
public enum ImportQueryType {
23+
IMPORT_QUERY("importQuery"),
24+
TABLE_NAME("tableName");
25+
26+
private String value;
27+
28+
ImportQueryType(String value) {
29+
this.value = value;
30+
}
31+
32+
public String getValue() {
33+
return value;
34+
}
35+
36+
public static ImportQueryType fromString(String value) {
37+
if (value == null) {
38+
return ImportQueryType.IMPORT_QUERY;
39+
}
40+
41+
for (ImportQueryType type : ImportQueryType.values()) {
42+
if (type.value.equalsIgnoreCase(value)) {
43+
return type;
44+
}
45+
}
46+
return ImportQueryType.IMPORT_QUERY;
47+
}
48+
}

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java

Lines changed: 47 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,15 @@
3838
import io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig;
3939
import io.cdap.plugin.db.source.AbstractDBSource;
4040
import io.cdap.plugin.util.DBUtils;
41+
import io.cdap.plugin.util.ImportQueryType;
4142
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
4243

4344
import java.util.Map;
4445
import javax.annotation.Nullable;
4546

4647
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.IMPORT_QUERY;
48+
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.PROPERTY_IMPORT_QUERY_TYPE;
4749
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.TABLE_NAME;
48-
4950
/**
5051
* Batch source to read from PostgreSQL.
5152
*/
@@ -73,29 +74,31 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
7374
}
7475
return;
7576
}
76-
String importQueryType = sourceConfig.getImportQueryType();
77-
boolean isTable = TABLE_NAME.equalsIgnoreCase(importQueryType);
78-
boolean isImportQuery = IMPORT_QUERY.equalsIgnoreCase(importQueryType);
79-
80-
if (isTable) {
81-
if (Strings.isNullOrEmpty(sourceConfig.getTableName())) {
82-
collector.addFailure(
83-
"'tableName' must be specified when importQueryType is 'table'.",
84-
"Provide a value for 'tableName'."
85-
).withConfigProperty(TABLE_NAME);
86-
}
87-
} else if (isImportQuery) {
88-
if (Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
89-
collector.addFailure(
90-
"'importQuery' must be specified when importQueryType is 'importQuery'.",
91-
"Provide a value for 'importQuery'."
92-
).withConfigProperty(IMPORT_QUERY);
77+
if (!sourceConfig.containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) {
78+
boolean isImportQuerySelected = ImportQueryType.IMPORT_QUERY.getValue().equals(sourceConfig.importQueryType);
79+
boolean isTableNameSelected = ImportQueryType.TABLE_NAME.getValue().equals(sourceConfig.importQueryType);
80+
81+
if (isImportQuerySelected && !sourceConfig.containsMacro(IMPORT_QUERY) &&
82+
Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
83+
collector.addFailure("Import Query cannot be empty", null)
84+
.withConfigProperty(IMPORT_QUERY);
85+
86+
} else if (isTableNameSelected && !sourceConfig.containsMacro(TABLE_NAME) &&
87+
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
88+
collector.addFailure("Table Name cannot be empty", null)
89+
.withConfigProperty(TABLE_NAME);
9390
}
9491
} else {
95-
collector.addFailure(
96-
"Both import query and table name are empty'.",
97-
"Either import query or tableName should be given. Both can not be null together."
98-
);
92+
boolean isImportQueryMissing = !sourceConfig.containsMacro(IMPORT_QUERY) &&
93+
Strings.isNullOrEmpty(sourceConfig.getImportQuery());
94+
boolean isTableNameMissing = !sourceConfig.containsMacro(TABLE_NAME) &&
95+
Strings.isNullOrEmpty(sourceConfig.getTableName());
96+
97+
if (isImportQueryMissing && isTableNameMissing) {
98+
collector.addFailure("Either 'Import Query' or 'Table Name' must be provided.", null)
99+
.withConfigProperty(IMPORT_QUERY)
100+
.withConfigProperty(TABLE_NAME);
101+
}
99102
}
100103
collector.getOrThrowException();
101104
super.configurePipeline(pipelineConfigurer);
@@ -108,29 +111,31 @@ public void prepareRun(BatchSourceContext context) throws Exception {
108111
return;
109112
}
110113

111-
String importQueryType = sourceConfig.getImportQueryType();
112-
boolean isTable = TABLE_NAME.equalsIgnoreCase(importQueryType);
113-
boolean isImportQuery = IMPORT_QUERY.equalsIgnoreCase(importQueryType);
114+
if (!sourceConfig.containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) {
115+
boolean isImportQuerySelected = ImportQueryType.IMPORT_QUERY.getValue().equals(sourceConfig.importQueryType);
116+
boolean isTableNameSelected = ImportQueryType.TABLE_NAME.getValue().equals(sourceConfig.importQueryType);
114117

115-
if (isTable) {
116-
if (Strings.isNullOrEmpty(sourceConfig.getTableName())) {
117-
collector.addFailure(
118-
"'tableName' must be specified when importQueryType is 'table'.",
119-
"Provide a value for 'tableName'."
120-
).withConfigProperty(TABLE_NAME);
121-
}
122-
} else if (isImportQuery) {
123-
if (Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
124-
collector.addFailure(
125-
"'importQuery' must be specified when importQueryType is 'importQuery'.",
126-
"Provide a value for 'importQuery'."
127-
).withConfigProperty(IMPORT_QUERY);
118+
if (isImportQuerySelected && !sourceConfig.containsMacro(IMPORT_QUERY) &&
119+
Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
120+
collector.addFailure("Import Query cannot be empty", null)
121+
.withConfigProperty(IMPORT_QUERY);
122+
123+
} else if (isTableNameSelected && !sourceConfig.containsMacro(TABLE_NAME) &&
124+
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
125+
collector.addFailure("Table Name cannot be empty", null)
126+
.withConfigProperty(TABLE_NAME);
128127
}
129128
} else {
130-
collector.addFailure(
131-
"Both import query and table name are empty'.",
132-
"Either import query or tableName should be given. Both can not be null together."
133-
);
129+
boolean isImportQueryMissing = !sourceConfig.containsMacro(IMPORT_QUERY) &&
130+
Strings.isNullOrEmpty(sourceConfig.getImportQuery());
131+
boolean isTableNameMissing = !sourceConfig.containsMacro(TABLE_NAME) &&
132+
Strings.isNullOrEmpty(sourceConfig.getTableName());
133+
134+
if (isImportQueryMissing && isTableNameMissing) {
135+
collector.addFailure("Either 'Import Query' or 'Table Name' must be provided.", null)
136+
.withConfigProperty(IMPORT_QUERY)
137+
.withConfigProperty(TABLE_NAME);
138+
}
134139
}
135140
collector.getOrThrowException();
136141
super.prepareRun(context);

0 commit comments

Comments
 (0)