Skip to content

Commit db1afd4

Browse files
authored
Merge pull request #77 from AdaptiveScale/feature/db-argument-setter
Database-based Argument Setter
2 parents c183333 + 4d1eff1 commit db1afd4

File tree

5 files changed

+473
-0
lines changed

5 files changed

+473
-0
lines changed
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright © 2020 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.db.batch.action;
18+
19+
import io.cdap.cdap.etl.api.FailureCollector;
20+
import io.cdap.cdap.etl.api.PipelineConfigurer;
21+
import io.cdap.cdap.etl.api.StageConfigurer;
22+
import io.cdap.cdap.etl.api.action.Action;
23+
import io.cdap.cdap.etl.api.action.ActionContext;
24+
import io.cdap.cdap.etl.api.action.SettableArguments;
25+
import io.cdap.plugin.db.ConnectionConfig;
26+
import io.cdap.plugin.util.DBUtils;
27+
import io.cdap.plugin.util.DriverCleanup;
28+
29+
import java.sql.Connection;
30+
import java.sql.Driver;
31+
import java.sql.DriverManager;
32+
import java.sql.ResultSet;
33+
import java.sql.SQLException;
34+
import java.sql.Statement;
35+
import java.util.Properties;
36+
37+
/**
38+
* Action that converts db column into pipeline argument.
39+
*/
40+
public class AbstractDBArgumentSetter extends Action {
41+
42+
private static final String JDBC_PLUGIN_ID = "driver";
43+
private final ArgumentSetterConfig config;
44+
45+
public AbstractDBArgumentSetter(ArgumentSetterConfig config) {
46+
this.config = config;
47+
}
48+
49+
@Override
50+
public void run(ActionContext context) throws Exception {
51+
Class<? extends Driver> driverClass = context.loadPluginClass(JDBC_PLUGIN_ID);
52+
FailureCollector failureCollector = context.getFailureCollector();
53+
SettableArguments settableArguments = context.getArguments();
54+
processArguments(driverClass, failureCollector, settableArguments);
55+
}
56+
57+
@Override
58+
public void configurePipeline(PipelineConfigurer pipelineConfigurer)
59+
throws IllegalArgumentException {
60+
DBUtils.validateJDBCPluginPipeline(pipelineConfigurer, config, JDBC_PLUGIN_ID);
61+
Class<? extends Driver> driverClass = DBUtils.getDriverClass(
62+
pipelineConfigurer, config, ConnectionConfig.JDBC_PLUGIN_TYPE);
63+
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
64+
FailureCollector collector = stageConfigurer.getFailureCollector();
65+
config.validate(collector);
66+
try {
67+
processArguments(driverClass, collector, null);
68+
} catch (SQLException e) {
69+
collector.addFailure("SQL error while executing query: " + e.getMessage(), null)
70+
.withStacktrace(e.getStackTrace());
71+
} catch (IllegalAccessException | InstantiationException e) {
72+
collector.addFailure("Unable to instantiate JDBC driver: " + e.getMessage(), null)
73+
.withStacktrace(e.getStackTrace());
74+
} catch (Exception e) {
75+
collector.addFailure(e.getMessage(), null).withStacktrace(e.getStackTrace());
76+
}
77+
}
78+
79+
/**
80+
* Creates connection to database. Reads row from database based on selection conditions and
81+
* depending on whether settable arguments is provided or not set the argument from row columns.
82+
*
83+
* @param driverClass {@link Class<? extends Driver>}
84+
* @param failureCollector {@link FailureCollector}
85+
* @param settableArguments {@link SettableArguments}
86+
* @throws SQLException is raised when there is sql related exception
87+
* @throws IllegalAccessException is raised when there is access related exception
88+
* @throws InstantiationException is raised when there is class/driver issue
89+
*/
90+
private void processArguments(Class<? extends Driver> driverClass,
91+
FailureCollector failureCollector, SettableArguments settableArguments)
92+
throws SQLException, IllegalAccessException, InstantiationException {
93+
DriverCleanup driverCleanup;
94+
95+
driverCleanup = DBUtils.ensureJDBCDriverIsAvailable(driverClass, config.getConnectionString(),
96+
config.jdbcPluginName);
97+
Properties connectionProperties = new Properties();
98+
connectionProperties.putAll(config.getConnectionArguments());
99+
try {
100+
Connection connection = DriverManager
101+
.getConnection(config.getConnectionString(), connectionProperties);
102+
Statement statement = connection.createStatement();
103+
ResultSet resultSet = statement.executeQuery(config.getQuery());
104+
boolean hasRecord = resultSet.next();
105+
if (!hasRecord) {
106+
failureCollector.addFailure("No record found.",
107+
"The argument selection conditions must match only one record.");
108+
return;
109+
}
110+
if (settableArguments != null) {
111+
setArguments(resultSet, failureCollector, settableArguments);
112+
}
113+
if (resultSet.next()) {
114+
failureCollector
115+
.addFailure("More than one records found.",
116+
"The argument selection conditions must match only one record.");
117+
}
118+
} finally {
119+
driverCleanup.destroy();
120+
}
121+
}
122+
123+
/**
124+
* Converts column from jdbc results set into pipeline arguments
125+
*
126+
* @param resultSet - result set from db {@link ResultSet}
127+
* @param failureCollector - context failure collector @{link FailureCollector}
128+
* @param arguments - context argument setter {@link SettableArguments}
129+
* @throws SQLException - raises {@link SQLException} when configuration is not valid
130+
*/
131+
private void setArguments(ResultSet resultSet, FailureCollector failureCollector,
132+
SettableArguments arguments) throws SQLException {
133+
String[] columns = config.getArgumentsColumns().split(",");
134+
for (String column : columns) {
135+
arguments.set(column, resultSet.getString(column));
136+
}
137+
}
138+
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright © 2020 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.db.batch.action;
18+
19+
import com.google.common.base.Strings;
20+
import io.cdap.cdap.api.annotation.Description;
21+
import io.cdap.cdap.api.annotation.Macro;
22+
import io.cdap.cdap.api.annotation.Name;
23+
import io.cdap.cdap.etl.api.FailureCollector;
24+
import io.cdap.plugin.db.ConnectionConfig;
25+
26+
/**
27+
* Config for ArgumentSetter reading from database
28+
*/
29+
public abstract class ArgumentSetterConfig extends ConnectionConfig {
30+
31+
public static final String DATABASE_NAME = "databaseName";
32+
public static final String TABLE_NAME = "tableName";
33+
public static final String ARGUMENT_SELECTION_CONDITIONS = "argumentSelectionConditions";
34+
public static final String ARGUMENTS_COLUMNS = "argumentsColumns";
35+
36+
@Name(ConnectionConfig.CONNECTION_STRING)
37+
@Description("JDBC connection string including database name.")
38+
@Macro
39+
public String connectionString;
40+
41+
@Name(DATABASE_NAME)
42+
@Description("The name of the database which contains\n"
43+
+ "the configuration table")
44+
@Macro
45+
String databaseName;
46+
47+
@Name(TABLE_NAME)
48+
@Description("The name of the table in the database\n"
49+
+ "containing the configurations for the pipeline")
50+
@Macro
51+
String tableName;
52+
53+
@Name(ARGUMENT_SELECTION_CONDITIONS)
54+
@Description("A set of conditions for identifying the\n"
55+
+ "arguments to run a pipeline. Users can\n"
56+
+ "specify multiple conditions in the format\n"
57+
+ "column1=<column1-value>;column2=<colum\n"
58+
+ "n2-value>. A particular use case for this\n"
59+
+ "would be feed=marketing AND\n"
60+
+ "date=20200427. The conditions specified\n"
61+
+ "should be logically ANDed to determine the\n"
62+
+ "arguments for a run. When the conditions are\n"
63+
+ "applied, the table should return exactly 1 row.\n"
64+
+ "If it doesn’t return any rows, or if it returns\n"
65+
+ "multiple rows, the pipeline should abort with\n"
66+
+ "appropriate errors. Typically, users should\n"
67+
+ "use macros in this field, so that they can\n"
68+
+ "specify the conditions at runtime.")
69+
@Macro
70+
String argumentSelectionConditions;
71+
72+
@Name(ARGUMENTS_COLUMNS)
73+
@Description("Names of the columns that contain the\n"
74+
+ "arguments for this run. The values of this\n"
75+
+ "columns in the row that satisfies the argument\n"
76+
+ "selection conditions determines the\n"
77+
+ "arguments for the pipeline run")
78+
@Macro
79+
String argumentsColumns;
80+
81+
public String getDatabaseName() {
82+
return databaseName;
83+
}
84+
85+
public String getTableName() {
86+
return tableName;
87+
}
88+
89+
public String getArgumentSelectionConditions() {
90+
return argumentSelectionConditions;
91+
}
92+
93+
public String getArgumentsColumns() {
94+
return argumentsColumns;
95+
}
96+
97+
public String getQuery() {
98+
if (this.getArgumentSelectionConditions() == null) {
99+
throw new IllegalArgumentException("Argument selection conditions are empty.");
100+
}
101+
String[] split = this.getArgumentSelectionConditions().split(";");
102+
String conditions = String.join(" AND ", split);
103+
104+
return String
105+
.format("SELECT %s FROM %s WHERE %s", this.getArgumentsColumns(), this.getTableName(),
106+
conditions);
107+
}
108+
109+
/**
110+
* Validates config input fields.
111+
*
112+
* @param collector context failure collector {@link FailureCollector}
113+
*/
114+
public void validate(FailureCollector collector) {
115+
if (!containsMacro(CONNECTION_STRING) && Strings.isNullOrEmpty(this.connectionString)) {
116+
collector.addFailure("Invalid connection string.", "Connection string cannot be empty.");
117+
}
118+
if (!containsMacro(ConnectionConfig.USER) && Strings.isNullOrEmpty(this.user)) {
119+
collector.addFailure("Invalid username.", "Username cannot be empty.");
120+
}
121+
if (!containsMacro(ConnectionConfig.PASSWORD) && Strings.isNullOrEmpty(this.password)) {
122+
collector.addFailure("Invalid password.", "Password cannot be empty.");
123+
}
124+
if (!containsMacro(DATABASE_NAME) && Strings.isNullOrEmpty(this.getDatabaseName())) {
125+
collector.addFailure("Invalid database.", "Valid database must be specified.");
126+
}
127+
if (!containsMacro(TABLE_NAME) && Strings.isNullOrEmpty(this.getTableName())) {
128+
collector.addFailure("Invalid table.", "Valid table must be specified.");
129+
}
130+
if (!containsMacro(ARGUMENTS_COLUMNS) && Strings.isNullOrEmpty(this.getArgumentsColumns())) {
131+
collector
132+
.addFailure("Invalid arguments columns.", "Arguments column names must be specified.");
133+
}
134+
if (!containsMacro(ARGUMENT_SELECTION_CONDITIONS) && Strings
135+
.isNullOrEmpty(this.getArgumentSelectionConditions())) {
136+
collector
137+
.addFailure("Invalid conditions.", "Filter conditions must be specified.");
138+
}
139+
collector.getOrThrowException();
140+
}
141+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Database Action
2+
3+
4+
Description
5+
-----------
6+
Action that converts table column into pipeline arguments.
7+
8+
9+
Use Case
10+
--------
11+
The action can be used whenever you want to pass data from database as arguments to a data pipeline.
12+
For example, you may want to read a particular table at runtime to set arguments for the pipeline.
13+
14+
15+
Properties
16+
----------
17+
**Driver Name:** Name of the JDBC driver to use.
18+
19+
**Connection String:** JDBC connection string including database name.
20+
21+
**Database:** Database name.
22+
23+
**Table:** Table name.
24+
25+
**Argument selection conditions:** A set of conditions for identifying the arguments to run a pipeline. Multiple conditions can be specified in the format
26+
column1=<column1-value>;column2=<column2-value>.
27+
28+
**Username:** User identity for connecting to the specified database.
29+
30+
**Password:** Password to use to connect to the specified database.
31+
32+
**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
33+
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.
34+
35+
**Arguments column:** Name of the column that contains the arguments.
36+
37+
Example
38+
-------
39+
Suppose you have a configuration management database and you want to read a particular table at runtime to set arguments for the pipeline, including the source configuration, the transformations, the sink configuration, etc.
40+
41+
```
42+
Driver Name: "postgres"
43+
Connection String: "jdbc:postgresql://localhost:5432/configuration"
44+
Database: "configuration"
45+
Table: "files"
46+
Argument selection conditions: "file_id=1"
47+
Arguments column: "file_name"
48+
```
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright © 2020 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.jdbc;
18+
19+
import io.cdap.cdap.api.annotation.Description;
20+
import io.cdap.cdap.api.annotation.Name;
21+
import io.cdap.cdap.api.annotation.Plugin;
22+
import io.cdap.cdap.etl.api.action.Action;
23+
import io.cdap.plugin.db.batch.action.AbstractDBArgumentSetter;
24+
import io.cdap.plugin.db.batch.action.ArgumentSetterConfig;
25+
26+
/**
27+
* Action that runs a db command
28+
*/
29+
@Plugin(type = Action.PLUGIN_TYPE)
30+
@Name("DatabaseArgumentSetter")
31+
@Description("Reads single record from database table and converts it to arguments for pipeline")
32+
public class DatabaseArgumentSetter extends AbstractDBArgumentSetter {
33+
34+
private final DatabaseArgumentSetterConfig config;
35+
36+
public DatabaseArgumentSetter(DatabaseArgumentSetterConfig config) {
37+
super(config);
38+
this.config = config;
39+
}
40+
41+
/**
42+
* Database action databaseActionConfig.
43+
*/
44+
public static class DatabaseArgumentSetterConfig extends ArgumentSetterConfig {
45+
46+
@Override
47+
public String getConnectionString() {
48+
return connectionString;
49+
}
50+
}
51+
}

0 commit comments

Comments
 (0)