Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>3.13.3</version>
<version>3.13.30</version>
</dependency>

<!--junit for unit test-->
Expand Down
2 changes: 1 addition & 1 deletion pom_confluent.xml
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>3.13.3</version>
<version>3.13.30</version>
</dependency>

<!--junit for unit test-->
Expand Down
48 changes: 18 additions & 30 deletions src/test/java/com/snowflake/kafka/connector/ConnectorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@
import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME;
import static com.snowflake.kafka.connector.internal.TestUtils.getConf;

import com.snowflake.kafka.connector.internal.TestUtils;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import net.snowflake.client.jdbc.SnowflakeSQLException;

import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigValue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -100,15 +99,6 @@ static Map<String, String> getCorrectConfig() {
return config;
}

@After
public void cleanup() throws SnowflakeSQLException {
try {
TestUtils.resetProxyParametersInJDBC();
} catch (SnowflakeSQLException ex) {
Assert.fail("Cannot reset proxy parameters in:" + this.getClass().getName());
}
}

@Test
public void testValidateErrorConfig() {
Map<String, ConfigValue> validateMap = toValidateMap(getErrorConfig());
Expand Down Expand Up @@ -280,8 +270,7 @@ public void testProxyHostPortConfig() {
Map<String, String> config = getCorrectConfig();
config.put(SnowflakeSinkConnectorConfig.JVM_PROXY_HOST, "localhost");
config.put(SnowflakeSinkConnectorConfig.JVM_PROXY_PORT, "8080");
Map<String, ConfigValue> validateMap = toValidateMap(config);
assertPropHasError(validateMap, new String[] {});
Utils.validateProxySetting(config);
}

@Test
Expand All @@ -290,13 +279,13 @@ public void testErrorProxyUsernameConfig() {
config.put(SnowflakeSinkConnectorConfig.JVM_PROXY_HOST, "localhost");
config.put(SnowflakeSinkConnectorConfig.JVM_PROXY_PORT, "8080");
config.put(SnowflakeSinkConnectorConfig.JVM_PROXY_USERNAME, "user");
Map<String, ConfigValue> validateMap = toValidateMap(config);
assertPropHasError(
validateMap,
new String[] {
SnowflakeSinkConnectorConfig.JVM_PROXY_USERNAME,
SnowflakeSinkConnectorConfig.JVM_PROXY_PASSWORD
});
try {
Utils.validateProxySetting(config);
} catch (SnowflakeKafkaConnectorException e) {
Assert.assertEquals("0023", e.getCode());
return;
}
Assert.fail();
}

@Test
Expand All @@ -305,13 +294,13 @@ public void testErrorProxyPasswordConfig() {
config.put(SnowflakeSinkConnectorConfig.JVM_PROXY_HOST, "localhost");
config.put(SnowflakeSinkConnectorConfig.JVM_PROXY_PORT, "8080");
config.put(SnowflakeSinkConnectorConfig.JVM_PROXY_PASSWORD, "pass");
Map<String, ConfigValue> validateMap = toValidateMap(config);
assertPropHasError(
validateMap,
new String[] {
SnowflakeSinkConnectorConfig.JVM_PROXY_USERNAME,
SnowflakeSinkConnectorConfig.JVM_PROXY_PASSWORD
});
try {
Utils.validateProxySetting(config);
} catch (SnowflakeKafkaConnectorException e) {
Assert.assertEquals("0023", e.getCode());
return;
}
Assert.fail();
}

@Test
Expand All @@ -321,8 +310,7 @@ public void testProxyUsernamePasswordConfig() {
config.put(SnowflakeSinkConnectorConfig.JVM_PROXY_PORT, "3128");
config.put(SnowflakeSinkConnectorConfig.JVM_PROXY_USERNAME, "admin");
config.put(SnowflakeSinkConnectorConfig.JVM_PROXY_PASSWORD, "test");
Map<String, ConfigValue> validateMap = toValidateMap(config);
assertPropHasError(validateMap, new String[] {});
Utils.validateProxySetting(config);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -14,12 +13,7 @@ public class SinkTaskProxyIT {

@After
public void testCleanup() {
try {
TestUtils.resetProxyParametersInJDBC();
TestUtils.resetProxyParametersInJVM();
} catch (SnowflakeSQLException ex) {
Assert.fail("Cannot reset proxy parameters in:" + this.getClass().getName());
}
TestUtils.resetProxyParametersInJVM();
}

@Test(expected = SnowflakeKafkaConnectorException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,6 @@ public void testInternalStageWithProxy() throws SnowflakeSQLException {
assert files1.size() == 2;
System.out.println(Logging.logMessage("Time: {} ms", (System.currentTimeMillis() - startTime)));
proxyConnectionService.dropStage(proxyStage);

// Reset proxy configuration
TestUtils.resetProxyParametersInJDBC();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
public class ResetProxyConfigExec extends Logging {
public static void main(String[] args) throws SnowflakeSQLException {
System.out.println("ResetProxyConfigExec::Start wiping Proxy config");
TestUtils.resetProxyParametersInJDBC();
TestUtils.resetProxyParametersInJVM();
System.out.println("ResetProxyConfigExec::Proxy Parameters reset in JVM in JDBC");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import net.snowflake.client.core.HttpUtil;
import net.snowflake.client.core.SFSessionProperty;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;

Expand Down Expand Up @@ -274,17 +271,6 @@ public static SnowflakeConnectionService getConnectionService(Map<String, String
return SnowflakeConnectionServiceFactory.builder().setProperties(configuration).build();
}

/**
* Reset proxy parameters in JDBC. JDBC's useProxy parameter is static member, needs to be reset
* after every test run. i.e needs to run after every new connection is set which modifies the
* proxy parameter.
*/
public static void resetProxyParametersInJDBC() throws SnowflakeSQLException {
Map<SFSessionProperty, Object> resetProxy = new EnumMap(SFSessionProperty.class);
resetProxy.put(SFSessionProperty.USE_PROXY, false);
HttpUtil.configureCustomProxyProperties(resetProxy);
}

/**
* Reset proxy parameters in JVM which is enabled during starting a sink Task. Call this if your
* test/code executes the Utils.enableJVMProxy function
Expand Down