Skip to content

Use AWS v2 SDK for S3 in product tests #26246

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 25 additions & 26 deletions testing/trino-product-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,6 @@
</properties>

<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down Expand Up @@ -116,12 +106,6 @@
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hive</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down Expand Up @@ -162,16 +146,6 @@
<dependency>
<groupId>io.trino.tempto</groupId>
<artifactId>tempto-core</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down Expand Up @@ -224,11 +198,36 @@
<artifactId>testng</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-core</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>glue</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>regions</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
</dependency>

<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@
*/
package io.trino.tests.product.deltalake;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.Region;
import io.trino.testing.minio.MinioClient;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;

import java.net.URI;

import static io.trino.testing.SystemEnvironmentUtils.requireEnv;
import static io.trino.testing.minio.MinioClient.DEFAULT_MINIO_ACCESS_KEY;
Expand All @@ -34,7 +32,9 @@ final class S3ClientFactory
public static final String AWS_S3_SERVER_TYPE = "aws";
public static final String MINIO_S3_SERVER_TYPE = "minio";

public AmazonS3 createS3Client(String serverType)
private S3ClientFactory() {}

public static S3Client createS3Client(String serverType)
{
return switch (serverType) {
case AWS_S3_SERVER_TYPE -> createAwsS3Client();
Expand All @@ -43,25 +43,22 @@ public AmazonS3 createS3Client(String serverType)
};
}

private AmazonS3 createAwsS3Client()
private static S3Client createAwsS3Client()
{
String region = requireEnv("AWS_REGION");
return AmazonS3Client.builder().withRegion(region).build();
return S3Client.builder().region(Region.of(region)).build();
}

private AmazonS3 createMinioS3Client()
private static S3Client createMinioS3Client()
{
AWSCredentials credentials = new BasicAWSCredentials(DEFAULT_MINIO_ACCESS_KEY, DEFAULT_MINIO_SECRET_KEY);
AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
ClientConfiguration clientConfiguration = new ClientConfiguration()
.withProtocol(Protocol.HTTP)
.withSignerOverride("AWSS3V4SignerType");
AwsCredentials credentials = AwsBasicCredentials.create(DEFAULT_MINIO_ACCESS_KEY, DEFAULT_MINIO_SECRET_KEY);
AwsCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(credentials);

return AmazonS3Client.builder()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(MinioClient.DEFAULT_MINIO_ENDPOINT, Region.US_East_2.name()))
.withPathStyleAccessEnabled(true)
.withClientConfiguration(clientConfiguration)
.withCredentials(credentialsProvider)
return S3Client.builder()
.endpointOverride(URI.create(MinioClient.DEFAULT_MINIO_ENDPOINT))
.region(Region.US_EAST_2)
.forcePathStyle(true)
.credentialsProvider(credentialsProvider)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@
*/
package io.trino.tests.product.deltalake;

import com.amazonaws.services.s3.AmazonS3;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.trino.tempto.AfterMethodWithContext;
import io.trino.tempto.BeforeMethodWithContext;
import org.testng.annotations.Test;
import software.amazon.awssdk.services.s3.S3Client;

import static io.trino.tempto.assertions.QueryAssert.Row.row;
import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS;
import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS;
import static io.trino.tests.product.deltalake.S3ClientFactory.createS3Client;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.removeS3Directory;
import static io.trino.tests.product.utils.QueryExecutors.onDelta;
Expand All @@ -38,12 +40,19 @@ public class TestDeltaLakeActiveFilesCache
@Named("s3.server_type")
private String s3ServerType;

private AmazonS3 s3;
private S3Client s3;

@BeforeMethodWithContext
public void setup()
{
s3 = new S3ClientFactory().createS3Client(s3ServerType);
s3 = createS3Client(s3ServerType);
}

@AfterMethodWithContext
public void cleanUp()
{
s3.close();
s3 = null;
}

@Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@
*/
package io.trino.tests.product.deltalake;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.trino.tempto.AfterMethodWithContext;
import io.trino.tempto.BeforeMethodWithContext;
import io.trino.tempto.assertions.QueryAssert.Row;
import io.trino.testng.services.Flaky;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;

import java.util.List;
import java.util.Map;
Expand All @@ -38,6 +39,7 @@
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_113;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS;
import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS;
import static io.trino.tests.product.deltalake.S3ClientFactory.createS3Client;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry;
Expand All @@ -55,12 +57,19 @@ public class TestDeltaLakeChangeDataFeedCompatibility
@Named("s3.server_type")
private String s3ServerType;

private AmazonS3 s3Client;
private S3Client s3Client;

@BeforeMethodWithContext
public void setup()
{
s3Client = new S3ClientFactory().createS3Client(s3ServerType);
s3Client = createS3Client(s3ServerType);
}

@AfterMethodWithContext
public void cleanUp()
{
s3Client.close();
s3Client = null;
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingModeDataProvider")
Expand Down Expand Up @@ -886,7 +895,7 @@ private void assertThereIsNoCdfFileGenerated(String tableName, String tablePrope
private void assertThatThereIsNoChangeDataFiles(String tableName)
{
String prefix = "databricks-compatibility-test-" + tableName + "/_change_data/";
ListObjectsV2Result listResult = s3Client.listObjectsV2(bucketName, prefix);
assertThat(listResult.getObjectSummaries()).isEmpty();
ListObjectsV2Response response = s3Client.listObjectsV2(request -> request.bucket(bucketName).prefix(prefix));
assertThat(response.contents()).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@
*/
package io.trino.tests.product.deltalake;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.trino.tempto.AfterMethodWithContext;
import io.trino.tempto.BeforeMethodWithContext;
import io.trino.tempto.assertions.QueryAssert.Row;
import io.trino.tempto.query.QueryResult;
import io.trino.testng.services.Flaky;
import io.trino.tests.product.deltalake.util.DatabricksVersion;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.S3Object;

import java.math.BigDecimal;
import java.util.List;
Expand All @@ -43,6 +43,7 @@
import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS_154;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS;
import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS;
import static io.trino.tests.product.deltalake.S3ClientFactory.createS3Client;
import static io.trino.tests.product.deltalake.TransactionLogAssertions.assertLastEntryIsCheckpointed;
import static io.trino.tests.product.deltalake.TransactionLogAssertions.assertTransactionLogVersion;
import static io.trino.tests.product.deltalake.util.DatabricksVersion.DATABRICKS_113_RUNTIME_VERSION;
Expand All @@ -62,16 +63,23 @@ public class TestDeltaLakeCheckpointsCompatibility
@Named("s3.server_type")
private String s3ServerType;

private AmazonS3 s3;
private S3Client s3;
private Optional<DatabricksVersion> databricksRuntimeVersion;

@BeforeMethodWithContext
public void setup()
{
s3 = new S3ClientFactory().createS3Client(s3ServerType);
s3 = createS3Client(s3ServerType);
databricksRuntimeVersion = getDatabricksRuntimeVersion();
}

@AfterMethodWithContext
public void cleanUp()
{
s3.close();
s3 = null;
}

@Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
public void testSparkCanReadTrinoCheckpoint()
{
Expand Down Expand Up @@ -731,13 +739,9 @@ private List<String> listSidecarFiles(String bucketName, String tableDirectory)

private List<String> listS3Directory(String bucketName, String directory)
{
ImmutableList.Builder<String> result = ImmutableList.builder();
ObjectListing listing = s3.listObjects(bucketName, directory);
do {
listing.getObjectSummaries().stream().map(S3ObjectSummary::getKey).forEach(result::add);
listing = s3.listNextBatchOfObjects(listing);
}
while (listing.isTruncated());
return result.build();
return s3.listObjectsV2Paginator(request -> request.bucket(bucketName).prefix(directory))
.contents().stream()
.map(S3Object::key)
.toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@
*/
package io.trino.tests.product.deltalake;

import com.amazonaws.services.s3.AmazonS3;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.trino.tempto.AfterMethodWithContext;
import io.trino.tempto.BeforeMethodWithContext;
import io.trino.tempto.assertions.QueryAssert.Row;
import io.trino.testng.services.Flaky;
import org.testng.annotations.Test;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.S3Object;

import java.sql.Date;
import java.util.List;
Expand All @@ -31,6 +33,7 @@
import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS;
import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS;
import static io.trino.tests.product.deltalake.S3ClientFactory.createS3Client;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH;
import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry;
Expand All @@ -46,13 +49,20 @@ public class TestDeltaLakeCloneTableCompatibility
@Named("s3.server_type")
private String s3ServerType;

private AmazonS3 s3;
private S3Client s3;

@BeforeMethodWithContext
public void setup()
{
super.setUp();
s3 = new S3ClientFactory().createS3Client(s3ServerType);
s3 = createS3Client(s3ServerType);
}

@AfterMethodWithContext
public void cleanUp()
{
s3.close();
s3 = null;
}

@Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
Expand Down Expand Up @@ -546,9 +556,11 @@ private List<String> getToBeVacuumedDataFilesFromDryRun(String tableName)

private List<String> getFilesFromTableDirectory(String directory)
{
return s3.listObjectsV2(bucketName, directory).getObjectSummaries().stream()
.filter(s3ObjectSummary -> !s3ObjectSummary.getKey().contains("/_delta_log"))
.map(s3ObjectSummary -> format("s3://%s/%s", bucketName, s3ObjectSummary.getKey()))
return s3.listObjectsV2Paginator(request -> request.bucket(bucketName).prefix(directory))
.contents().stream()
.map(S3Object::key)
.filter(key -> !key.contains("/_delta_log"))
.map(key -> format("s3://%s/%s", bucketName, key))
.collect(toImmutableList());
}

Expand Down
Loading