Skip to content

Commit 2cdd7b1

Browse files
psainicsAbhishekKumar9984
authored andcommitted
adding import query type property to db plugin.which will only reflect in redshift and Postgres plugin.
1 parent e5c5794 commit 2cdd7b1

File tree

24 files changed

+785
-77
lines changed

24 files changed

+785
-77
lines changed

amazon-redshift-plugin/docs/Redshift-batchsource.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ contain the '$CONDITIONS' string. For example, 'SELECT * FROM table WHERE $CONDI
3131
The '$CONDITIONS' string will be replaced by 'splitBy' field limits specified by the bounding query.
3232
The '$CONDITIONS' string is not required if numSplits is set to one.
3333

34+
**ImportQueryType** - determines how data is extracted—either by using a Table Name or a custom Import Query.
35+
36+
* **TableName**: Extracts data directly from a specified database table.
37+
3438
**Bounding Query:** Bounding Query should return the min and max of the values of the 'splitBy' field.
3539
For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is set to one.
3640

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnector.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
111111
}
112112
sourceProperties.put(RedshiftSource.RedshiftSourceConfig.IMPORT_QUERY,
113113
getTableQuery(path.getDatabase(), schema, table));
114+
sourceProperties.put(RedshiftSource.RedshiftSourceConfig.PROPERTY_IMPORT_QUERY_TYPE,
115+
RedshiftSource.RedshiftSourceConfig.IMPORT_QUERY);
114116
sourceProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
115117
}
116118

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import io.cdap.plugin.db.CommonSchemaReader;
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
25-
2625
import java.sql.ResultSet;
2726
import java.sql.ResultSetMetaData;
2827
import java.sql.SQLException;
@@ -56,34 +55,12 @@ public RedshiftSchemaReader(String sessionID) {
5655
public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException {
5756
String typeName = metadata.getColumnTypeName(index);
5857
int columnType = metadata.getColumnType(index);
58+
int precision = metadata.getPrecision(index);
59+
String columnName = metadata.getColumnName(index);
60+
int scale = metadata.getScale(index);
61+
boolean isSigned = metadata.isSigned(index);
5962

60-
if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
61-
return Schema.of(Schema.Type.STRING);
62-
}
63-
if (typeName.equalsIgnoreCase("INT")) {
64-
return Schema.of(Schema.Type.INT);
65-
}
66-
if (typeName.equalsIgnoreCase("BIGINT")) {
67-
return Schema.of(Schema.Type.LONG);
68-
}
69-
70-
// If it is a numeric type without precision then use the Schema of String to avoid any precision loss
71-
if (Types.NUMERIC == columnType) {
72-
int precision = metadata.getPrecision(index);
73-
if (precision == 0) {
74-
LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
75-
+ "converting into STRING type to avoid any precision loss.",
76-
metadata.getColumnName(index),
77-
metadata.getColumnTypeName(index)));
78-
return Schema.of(Schema.Type.STRING);
79-
}
80-
}
81-
82-
if (typeName.equalsIgnoreCase("timestamp")) {
83-
return Schema.of(Schema.LogicalType.DATETIME);
84-
}
85-
86-
return super.getSchema(metadata, index);
63+
return getSchema(typeName, columnType, precision, scale, columnName, isSigned, true);
8764
}
8865

8966
@Override
@@ -113,5 +90,37 @@ public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLExcepti
11390
}
11491
return schemaFields;
11592
}
116-
93+
/**
94+
* Maps database column type information to a corresponding {@link Schema}.
95+
*
96+
* @param typeName the SQL type name
97+
* @param columnType the JDBC type code
98+
* @param precision the column precision
99+
* @param scale the column scale
100+
* @param columnName the column name
101+
* @return the mapped {@link Schema} type
102+
*/
103+
@Override
104+
public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName,
105+
boolean isSigned, boolean handleAsDecimal) {
106+
if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
107+
return Schema.of(Schema.Type.STRING);
108+
}
109+
if ("INT".equalsIgnoreCase(typeName)) {
110+
return Schema.of(Schema.Type.INT);
111+
}
112+
if ("BIGINT".equalsIgnoreCase(typeName)) {
113+
return Schema.of(Schema.Type.LONG);
114+
}
115+
if (Types.NUMERIC == columnType && precision == 0) {
116+
LOG.warn(String.format("Field '%s' is a %s type without precision and scale," +
117+
" converting into STRING type to avoid any precision loss.",
118+
columnName, typeName));
119+
return Schema.of(Schema.Type.STRING);
120+
}
121+
if ("timestamp".equalsIgnoreCase(typeName)) {
122+
return Schema.of(Schema.LogicalType.DATETIME);
123+
}
124+
return super.getSchema(typeName, columnType, precision, scale, columnName, isSigned, handleAsDecimal);
125+
}
117126
}

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717
package io.cdap.plugin.amazon.redshift;
1818

1919
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.common.base.Strings;
2021
import io.cdap.cdap.api.annotation.Description;
2122
import io.cdap.cdap.api.annotation.Macro;
2223
import io.cdap.cdap.api.annotation.Metadata;
2324
import io.cdap.cdap.api.annotation.MetadataProperty;
2425
import io.cdap.cdap.api.annotation.Name;
2526
import io.cdap.cdap.api.annotation.Plugin;
2627
import io.cdap.cdap.etl.api.FailureCollector;
28+
import io.cdap.cdap.etl.api.PipelineConfigurer;
29+
import io.cdap.cdap.etl.api.StageConfigurer;
2730
import io.cdap.cdap.etl.api.batch.BatchSource;
2831
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
2932
import io.cdap.cdap.etl.api.connector.Connector;
@@ -40,6 +43,9 @@
4043
import java.util.Map;
4144
import javax.annotation.Nullable;
4245

46+
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.IMPORT_QUERY;
47+
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.TABLE_NAME;
48+
4349
/**
4450
* Batch source to read from an Amazon Redshift database.
4551
*/
@@ -59,6 +65,76 @@ public RedshiftSource(RedshiftSourceConfig redshiftSourceConfig) {
5965
this.redshiftSourceConfig = redshiftSourceConfig;
6066
}
6167

68+
@Override
69+
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
70+
FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
71+
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
72+
if (sourceConfig.containsMacro(TABLE_NAME) || sourceConfig.containsMacro(IMPORT_QUERY)) {
73+
if (sourceConfig.getSchema() != null) {
74+
stageConfigurer.setOutputSchema(sourceConfig.getSchema());
75+
}
76+
return;
77+
}
78+
String importQueryType = sourceConfig.getImportQueryType();
79+
boolean isTable = TABLE_NAME.equalsIgnoreCase(importQueryType);
80+
boolean isImportQuery = IMPORT_QUERY.equalsIgnoreCase(importQueryType);
81+
82+
if (isTable) {
83+
if (Strings.isNullOrEmpty(sourceConfig.getTableName())) {
84+
collector.addFailure(
85+
"'tableName' must be specified when importQueryType is 'table'.",
86+
"Provide a value for 'tableName'."
87+
).withConfigProperty(TABLE_NAME);
88+
}
89+
} else if (isImportQuery) {
90+
if (Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
91+
collector.addFailure(
92+
"'importQuery' must be specified when importQueryType is 'importQuery'.",
93+
"Provide a value for 'importQuery'."
94+
).withConfigProperty(IMPORT_QUERY);
95+
}
96+
} else {
97+
collector.addFailure(
98+
"Both import query and table name are empty'.",
99+
"Either import query or tableName should be given. Both can not be null together."
100+
);
101+
}
102+
collector.getOrThrowException();
103+
super.configurePipeline(pipelineConfigurer);
104+
}
105+
106+
@Override
107+
public void prepareRun(BatchSourceContext context) throws Exception {
108+
FailureCollector collector = context.getFailureCollector();
109+
String importQueryType = sourceConfig.getImportQueryType();
110+
boolean isTable = TABLE_NAME.equalsIgnoreCase(importQueryType);
111+
boolean isImportQuery = IMPORT_QUERY.equalsIgnoreCase(importQueryType);
112+
113+
if (isTable) {
114+
if (Strings.isNullOrEmpty(sourceConfig.getTableName())) {
115+
collector.addFailure(
116+
"'tableName' must be specified when importQueryType is 'table'.",
117+
"Provide a value for 'tableName'."
118+
).withConfigProperty(TABLE_NAME);
119+
}
120+
} else if (isImportQuery) {
121+
if (Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
122+
collector.addFailure(
123+
"'importQuery' must be specified when importQueryType is 'importQuery'.",
124+
"Provide a value for 'importQuery'."
125+
).withConfigProperty(IMPORT_QUERY);
126+
}
127+
} else {
128+
collector.addFailure(
129+
"Both import query and table name are empty'.",
130+
"Either import query or tableName should be given. Both can not be null together."
131+
);
132+
}
133+
134+
collector.getOrThrowException();
135+
super.prepareRun(context);
136+
}
137+
62138
@Override
63139
protected SchemaReader getSchemaReader() {
64140
return new RedshiftSchemaReader();

amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftFailedConnectionTest.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.junit.Test;
2121

2222
import java.io.IOException;
23+
import static org.junit.Assert.assertTrue;
2324

2425
public class RedshiftFailedConnectionTest extends DBSpecificFailedConnectionTest {
2526
private static final String JDBC_DRIVER_CLASS_NAME = "com.amazon.redshift.Driver";
@@ -28,11 +29,23 @@ public class RedshiftFailedConnectionTest extends DBSpecificFailedConnectionTest
2829
public void test() throws ClassNotFoundException, IOException {
2930

3031
RedshiftConnector connector = new RedshiftConnector(
31-
new RedshiftConnectorConfig("username", "password", "jdbc", "", "localhost", "db", 5432));
32+
new RedshiftConnectorConfig("username", "password", "jdbc", "",
33+
"localhost", "db", 5432));
3234

33-
super.test(JDBC_DRIVER_CLASS_NAME, connector, "Failed to create connection to database via connection string: " +
34-
"jdbc:redshift://localhost:5432/db and arguments: " +
35-
"{user=username}. Error: ConnectException: Connection refused " +
36-
"(Connection refused).");
35+
String expectedPrefix = "Failed to create connection to database via connection string: " +
36+
"jdbc:redshift://localhost:5432/db and arguments: {user=username}. Error:";
37+
try {
38+
super.test(JDBC_DRIVER_CLASS_NAME, connector, expectedPrefix + " ConnectException: Connection " +
39+
"refused (Connection refused).");
40+
} catch (AssertionError e) {
41+
// Accept either ConnectException or SunCertPathBuilderException
42+
String message = e.getMessage();
43+
assertTrue(
44+
"Expected either ConnectException or SunCertPathBuilderException, but got: " + message,
45+
message.contains("ConnectException: Connection refused") ||
46+
message.contains("SunCertPathBuilderException: unable to find valid certification " +
47+
"path to requested target")
48+
);
49+
}
3750
}
3851
}

amazon-redshift-plugin/widgets/Redshift-batchsource.json

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,30 @@
108108
{
109109
"label": "SQL Query",
110110
"properties": [
111+
{
112+
"widget-type": "radio-group",
113+
"label": "Import Query Type",
114+
"name": "importQueryType",
115+
"widget-attributes": {
116+
"layout": "inline",
117+
"default": "importQuery",
118+
"options": [
119+
{
120+
"id": "importQuery",
121+
"label": "Native Query"
122+
},
123+
{
124+
"id": "tableName",
125+
"label": "Named Table"
126+
}
127+
]
128+
}
129+
},
130+
{
131+
"widget-type": "textbox",
132+
"label": "Table Name",
133+
"name": "tableName"
134+
},
111135
{
112136
"widget-type": "textarea",
113137
"label": "Import Query",
@@ -229,6 +253,30 @@
229253
}
230254
]
231255
},
256+
{
257+
"name": "ImportQuery",
258+
"condition": {
259+
"expression": "importQueryType != 'tableName'"
260+
},
261+
"show": [
262+
{
263+
"type": "property",
264+
"name": "importQuery"
265+
}
266+
]
267+
},
268+
{
269+
"name": "NativeTableName",
270+
"condition": {
271+
"expression": "importQueryType == 'tableName'"
272+
},
273+
"show": [
274+
{
275+
"type": "property",
276+
"name": "tableName"
277+
}
278+
]
279+
}
232280
],
233281
"jump-config": {
234282
"datasets": [

cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,30 @@
127127
{
128128
"label": "CloudSQL Properties",
129129
"properties": [
130+
{
131+
"widget-type": "hidden",
132+
"label": "Import Query Type",
133+
"name": "importQueryType",
134+
"widget-attributes": {
135+
"layout": "inline",
136+
"default": "importQuery",
137+
"options": [
138+
{
139+
"id": "importQuery",
140+
"label": "Native Query"
141+
},
142+
{
143+
"id": "tableName",
144+
"label": "Named Table"
145+
}
146+
]
147+
}
148+
},
149+
{
150+
"widget-type": "hidden",
151+
"label": "Table Name",
152+
"name": "tableName"
153+
},
130154
{
131155
"widget-type": "textarea",
132156
"label": "Import Query",

cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,30 @@
127127
{
128128
"label": "CloudSQL Properties",
129129
"properties": [
130+
{
131+
"widget-type": "hidden",
132+
"label": "Import Query Type",
133+
"name": "importQueryType",
134+
"widget-attributes": {
135+
"layout": "inline",
136+
"default": "importQuery",
137+
"options": [
138+
{
139+
"id": "importQuery",
140+
"label": "Native Query"
141+
},
142+
{
143+
"id": "tableName",
144+
"label": "Named Table"
145+
}
146+
]
147+
}
148+
},
149+
{
150+
"widget-type": "hidden",
151+
"label": "Table Name",
152+
"name": "tableName"
153+
},
130154
{
131155
"widget-type": "textarea",
132156
"label": "Import Query",

0 commit comments

Comments
 (0)