Skip to content

Commit 0590df4

Browse files
psainicsAbhishekKumar9984
authored andcommitted
resolved Pr comments.
adding import query type property to db plugin.which will only reflect in redshift and Postgres plugin.
1 parent e5c5794 commit 0590df4

File tree

19 files changed

+725
-47
lines changed

19 files changed

+725
-47
lines changed

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java

Lines changed: 71 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
import io.cdap.plugin.db.CommonSchemaReader;
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
25-
25+
import java.sql.Connection;
26+
import java.sql.DatabaseMetaData;
2627
import java.sql.ResultSet;
2728
import java.sql.ResultSetMetaData;
2829
import java.sql.SQLException;
@@ -56,33 +57,10 @@ public RedshiftSchemaReader(String sessionID) {
5657
public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException {
5758
String typeName = metadata.getColumnTypeName(index);
5859
int columnType = metadata.getColumnType(index);
59-
60-
if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
61-
return Schema.of(Schema.Type.STRING);
62-
}
63-
if (typeName.equalsIgnoreCase("INT")) {
64-
return Schema.of(Schema.Type.INT);
65-
}
66-
if (typeName.equalsIgnoreCase("BIGINT")) {
67-
return Schema.of(Schema.Type.LONG);
68-
}
69-
70-
// If it is a numeric type without precision then use the Schema of String to avoid any precision loss
71-
if (Types.NUMERIC == columnType) {
72-
int precision = metadata.getPrecision(index);
73-
if (precision == 0) {
74-
LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
75-
+ "converting into STRING type to avoid any precision loss.",
76-
metadata.getColumnName(index),
77-
metadata.getColumnTypeName(index)));
78-
return Schema.of(Schema.Type.STRING);
79-
}
80-
}
81-
82-
if (typeName.equalsIgnoreCase("timestamp")) {
83-
return Schema.of(Schema.LogicalType.DATETIME);
84-
}
85-
60+
int precision = metadata.getPrecision(index);
61+
int scale = metadata.getScale(index);
62+
String columnName = metadata.getColumnName(index);
63+
getSchema(typeName, columnType , precision , scale , columnName);
8664
return super.getSchema(metadata, index);
8765
}
8866

@@ -114,4 +92,69 @@ public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLExcepti
11492
return schemaFields;
11593
}
11694

95+
/**
96+
* Override: Fetches schema fields for a specific table using database metadata.
97+
*/
98+
@Override
99+
public List<Schema.Field> getSchemaFields(Connection connection, String tableName) throws SQLException {
100+
DatabaseMetaData dbMetaData = connection.getMetaData();
101+
String schema = null;
102+
String table = tableName;
103+
if (tableName.contains(".")) {
104+
String[] parts = tableName.split("\\.", 2);
105+
schema = parts[0];
106+
table = parts[1];
107+
}
108+
try (ResultSet columns = dbMetaData.getColumns(null, schema, table, null)) {
109+
List<Schema.Field> schemaFields = Lists.newArrayList();
110+
while (columns.next()) {
111+
String columnName = columns.getString("COLUMN_NAME");
112+
String typeName = columns.getString("TYPE_NAME");
113+
int columnType = columns.getInt("DATA_TYPE");
114+
int precision = columns.getInt("COLUMN_SIZE");
115+
int scale = columns.getInt("DECIMAL_DIGITS");
116+
int nullable = columns.getInt("NULLABLE");
117+
Schema columnSchema = getSchema(typeName, columnType, precision, scale, columnName);
118+
if (nullable == DatabaseMetaData.columnNullable) {
119+
columnSchema = Schema.nullableOf(columnSchema);
120+
}
121+
Schema.Field field = Schema.Field.of(columnName, columnSchema);
122+
schemaFields.add(field);
123+
}
124+
return schemaFields;
125+
}
126+
}
127+
128+
/**
129+
* Maps database column type information to a corresponding {@link Schema}.
130+
*
131+
* @param typeName the SQL type name
132+
* @param columnType the JDBC type code
133+
* @param precision the column precision
134+
* @param scale the column scale
135+
* @param columnName the column name
136+
* @return the mapped {@link Schema} type
137+
*/
138+
139+
public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName) {
140+
if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
141+
return Schema.of(Schema.Type.STRING);
142+
}
143+
if ("INT".equalsIgnoreCase(typeName)) {
144+
return Schema.of(Schema.Type.INT);
145+
}
146+
if ("BIGINT".equalsIgnoreCase(typeName)) {
147+
return Schema.of(Schema.Type.LONG);
148+
}
149+
if (Types.NUMERIC == columnType && precision == 0) {
150+
LOG.warn(String.format("Field '%s' is a %s type without precision and scale," +
151+
" converting into STRING type to avoid any precision loss.",
152+
columnName, typeName));
153+
return Schema.of(Schema.Type.STRING);
154+
}
155+
if ("timestamp".equalsIgnoreCase(typeName)) {
156+
return Schema.of(Schema.LogicalType.DATETIME);
157+
}
158+
return Schema.of(Schema.Type.STRING);
159+
}
117160
}

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717
package io.cdap.plugin.amazon.redshift;
1818

1919
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.common.base.Strings;
2021
import io.cdap.cdap.api.annotation.Description;
2122
import io.cdap.cdap.api.annotation.Macro;
2223
import io.cdap.cdap.api.annotation.Metadata;
2324
import io.cdap.cdap.api.annotation.MetadataProperty;
2425
import io.cdap.cdap.api.annotation.Name;
2526
import io.cdap.cdap.api.annotation.Plugin;
2627
import io.cdap.cdap.etl.api.FailureCollector;
28+
import io.cdap.cdap.etl.api.PipelineConfigurer;
2729
import io.cdap.cdap.etl.api.batch.BatchSource;
2830
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
2931
import io.cdap.cdap.etl.api.connector.Connector;
@@ -36,6 +38,9 @@
3638
import io.cdap.plugin.util.DBUtils;
3739
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
3840

41+
import java.sql.Connection;
42+
import java.sql.DatabaseMetaData;
43+
import java.sql.SQLException;
3944
import java.util.Collections;
4045
import java.util.Map;
4146
import javax.annotation.Nullable;
@@ -59,6 +64,21 @@ public RedshiftSource(RedshiftSourceConfig redshiftSourceConfig) {
5964
this.redshiftSourceConfig = redshiftSourceConfig;
6065
}
6166

67+
@Override
68+
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
69+
FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
70+
if (!sourceConfig.containsMacro("tableName") && !sourceConfig.containsMacro("importQuery")) {
71+
if ((Strings.isNullOrEmpty(sourceConfig.getTableName()))
72+
&& (Strings.isNullOrEmpty(sourceConfig.getImportQuery()))) {
73+
collector.addFailure(
74+
"Either 'tableName' or 'importQuery' must be specified.",
75+
"Provide a value for either 'tableName' or 'importQuery' in the configuration."
76+
).withConfigProperty(sourceConfig.getTableName()).withConfigProperty(sourceConfig.getImportQuery());
77+
}
78+
}
79+
super.configurePipeline(pipelineConfigurer);
80+
}
81+
6282
@Override
6383
protected SchemaReader getSchemaReader() {
6484
return new RedshiftSchemaReader();

amazon-redshift-plugin/widgets/Redshift-batchsource.json

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,30 @@
108108
{
109109
"label": "SQL Query",
110110
"properties": [
111+
{
112+
"widget-type": "radio-group",
113+
"label": "Import Query Type",
114+
"name": "importQueryType",
115+
"widget-attributes": {
116+
"layout": "inline",
117+
"default": "importQuery",
118+
"options": [
119+
{
120+
"id": "importQuery",
121+
"label": "Native Query"
122+
},
123+
{
124+
"id": "tableName",
125+
"label": "Named Table"
126+
}
127+
]
128+
}
129+
},
130+
{
131+
"widget-type": "textbox",
132+
"label": "Table Name",
133+
"name": "tableName"
134+
},
111135
{
112136
"widget-type": "textarea",
113137
"label": "Import Query",
@@ -229,6 +253,30 @@
229253
}
230254
]
231255
},
256+
{
257+
"name": "ImportQuery",
258+
"condition": {
259+
"expression": "importQueryType != 'tableName'"
260+
},
261+
"show": [
262+
{
263+
"type": "property",
264+
"name": "importQuery"
265+
}
266+
]
267+
},
268+
{
269+
"name": "NativeTableName",
270+
"condition": {
271+
"expression": "importQueryType == 'tableName'"
272+
},
273+
"show": [
274+
{
275+
"type": "property",
276+
"name": "tableName"
277+
}
278+
]
279+
}
232280
],
233281
"jump-config": {
234282
"datasets": [

cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,30 @@
127127
{
128128
"label": "CloudSQL Properties",
129129
"properties": [
130+
{
131+
"widget-type": "hidden",
132+
"label": "Import Query Type",
133+
"name": "importQueryType",
134+
"widget-attributes": {
135+
"layout": "inline",
136+
"default": "importQuery",
137+
"options": [
138+
{
139+
"id": "importQuery",
140+
"label": "Native Query"
141+
},
142+
{
143+
"id": "tableName",
144+
"label": "Named Table"
145+
}
146+
]
147+
}
148+
},
149+
{
150+
"widget-type": "hidden",
151+
"label": "Table Name",
152+
"name": "tableName"
153+
},
130154
{
131155
"widget-type": "textarea",
132156
"label": "Import Query",
@@ -251,6 +275,30 @@
251275
"name": "port"
252276
}
253277
]
278+
},
279+
{
280+
"name": "ImportQuery",
281+
"condition": {
282+
"expression": "importQueryType == 'importQuery'"
283+
},
284+
"show": [
285+
{
286+
"type": "property",
287+
"name": "importQuery"
288+
}
289+
]
290+
},
291+
{
292+
"name": "NativeTableName",
293+
"condition": {
294+
"expression": "importQueryType == 'tableName'"
295+
},
296+
"show": [
297+
{
298+
"type": "property",
299+
"name": "tableName"
300+
}
301+
]
254302
}
255303
],
256304
"jump-config": {

cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,30 @@
127127
{
128128
"label": "CloudSQL Properties",
129129
"properties": [
130+
{
131+
"widget-type": "hidden",
132+
"label": "Import Query Type",
133+
"name": "importQueryType",
134+
"widget-attributes": {
135+
"layout": "inline",
136+
"default": "importQuery",
137+
"options": [
138+
{
139+
"id": "importQuery",
140+
"label": "Native Query"
141+
},
142+
{
143+
"id": "tableName",
144+
"label": "Named Table"
145+
}
146+
]
147+
}
148+
},
149+
{
150+
"widget-type": "hidden",
151+
"label": "Table Name",
152+
"name": "tableName"
153+
},
130154
{
131155
"widget-type": "textarea",
132156
"label": "Import Query",
@@ -255,6 +279,30 @@
255279
"name": "port"
256280
}
257281
]
282+
},
283+
{
284+
"name": "ImportQuery",
285+
"condition": {
286+
"expression": "importQueryType == 'importQuery'"
287+
},
288+
"show": [
289+
{
290+
"type": "property",
291+
"name": "importQuery"
292+
}
293+
]
294+
},
295+
{
296+
"name": "NativeTableName",
297+
"condition": {
298+
"expression": "importQueryType == 'tableName'"
299+
},
300+
"show": [
301+
{
302+
"type": "property",
303+
"name": "tableName"
304+
}
305+
]
258306
}
259307
],
260308
"jump-config": {

0 commit comments

Comments
 (0)