Skip to content

add refresh credentials property to loadTableResult #2341

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ public String genericTables(Namespace ns) {
"polaris", "v1", prefix, "namespaces", RESTUtil.encodeNamespace(ns), "generic-tables");
}

public String credentialsPath(TableIdentifier ident) {
return SLASH.join(
"v1",
prefix,
"namespaces",
RESTUtil.encodeNamespace(ident.namespace()),
"tables",
RESTUtil.encodeString(ident.name()),
"credentials");
}

public String genericTable(TableIdentifier ident) {
return SLASH.join(
"polaris",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Set;
import java.util.function.Function;
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.BadRequestException;
Expand Down Expand Up @@ -71,7 +72,9 @@
import org.apache.polaris.core.persistence.resolver.ResolverFactory;
import org.apache.polaris.core.persistence.resolver.ResolverStatus;
import org.apache.polaris.core.rest.PolarisEndpoints;
import org.apache.polaris.core.rest.PolarisResourcePaths;
import org.apache.polaris.core.secrets.UserSecretsManager;
import org.apache.polaris.core.storage.StorageAccessProperty;
import org.apache.polaris.service.catalog.AccessDelegationMode;
import org.apache.polaris.service.catalog.CatalogPrefixParser;
import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService;
Expand Down Expand Up @@ -420,16 +423,45 @@ public Response loadTable(
.loadTableIfStale(tableIdentifier, ifNoneMatch, snapshots)
.orElseThrow(() -> new WebApplicationException(Response.Status.NOT_MODIFIED));
} else {
response =
LoadTableResponse originalResponse =
catalog
.loadTableWithAccessDelegationIfStale(tableIdentifier, ifNoneMatch, snapshots)
.orElseThrow(() -> new WebApplicationException(Response.Status.NOT_MODIFIED));

if (delegationModes.contains(VENDED_CREDENTIALS)) {
response =
injectRefreshVendedCredentialProperties(
originalResponse,
new PolarisResourcePaths(prefix).credentialsPath(tableIdentifier));
} else {
response = originalResponse;
}
}

return tryInsertETagHeader(Response.ok(response), response, namespace, table).build();
});
}

private LoadTableResponse injectRefreshVendedCredentialProperties(
LoadTableResponse originalResponse, String credentialsEndpoint) {
LoadTableResponse.Builder loadResponseBuilder =
LoadTableResponse.builder().withTableMetadata(originalResponse.tableMetadata());
loadResponseBuilder.addAllConfig(originalResponse.config());
loadResponseBuilder.addAllCredentials(originalResponse.credentials());
loadResponseBuilder.addConfig(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about azure ? do we want to do it in follow-up ? also should we only inject this when creds are for aws ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I think enabling the Azure refresh properties currently causes Azure credential vending to immediately fail due to apache/iceberg#13733, so could maybe wait for that fix to release?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also should we only inject this when creds are for aws?

Could maybe lift the AWS refresh check below to loadTable itself, to avoid e.g. reconstructing the LoadTableResponse if not needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to provide the URL even if it's not AWS that way the user can override AwsClientProperties.REFRESH_CREDENTIALS_ENABLED if he is working with a system that he knows supports it despite the server not being updated to reflect this yet. For example, an updated Azure iceberg SDK like mentioned above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AWS cred refresh properties are :

client.refresh-credentials-enabled
client.refresh-credentials-endpoint

Azure credential refresh properties are :

adls.refresh-credentials-enabled
adls.refresh-credentials-endpoint

I am not sure how can one re-use aws cred endpoint prop to enable azure ? nevertheless why would server send back azure properties for an aws storage ?

AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT, credentialsEndpoint);
// Only enable credential refresh for currently supported credential types
if (originalResponse.credentials().stream()
.anyMatch(
credential ->
credential
.config()
.containsKey(StorageAccessProperty.AWS_SECRET_KEY.getPropertyName()))) {
loadResponseBuilder.addConfig(AwsClientProperties.REFRESH_CREDENTIALS_ENABLED, "true");
}
return loadResponseBuilder.build();
}

@Override
public Response tableExists(
String prefix,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,23 @@

package org.apache.polaris.service.catalog.iceberg;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.iceberg.Schema;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.inmemory.InMemoryCatalog;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.types.Types;
import org.apache.polaris.core.admin.model.AuthenticationParameters;
import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
import org.apache.polaris.core.admin.model.BearerAuthenticationParameters;
Expand All @@ -43,6 +49,7 @@
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Strings;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -242,4 +249,60 @@ private static Stream<Arguments> paginationTestCases() {
Arguments.of("5", 5),
Arguments.of("5", 10));
}

@Test
void testLoadTableReturnsCredentialsRefreshEndpoint() throws IOException {
try (InMemoryCatalog inMemoryCatalog = new InMemoryCatalog()) {
// Initialize and replace the default handler with one backed by in-memory catalog
inMemoryCatalog.initialize("inMemory", Map.of());
mockCatalogAdapter(inMemoryCatalog);

// Create a namespace and table
String namespace = "test_ns";
String tableName = "test_table";
inMemoryCatalog.createNamespace(Namespace.of(namespace));

Schema schema =
new Schema(
Types.NestedField.required(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "name", Types.StringType.get()));

CreateTableRequest createTableRequest =
CreateTableRequest.builder().withName(tableName).withSchema(schema).build();

// Create the table first
catalogAdapter.createTable(
FEDERATED_CATALOG_NAME,
namespace,
createTableRequest,
"vended-credentials",
testServices.realmContext(),
testServices.securityContext());

// Load the table with vended credentials access delegation mode
LoadTableResponse response =
(LoadTableResponse)
catalogAdapter
.loadTable(
FEDERATED_CATALOG_NAME,
namespace,
tableName,
"vended-credentials",
null,
null,
testServices.realmContext(),
testServices.securityContext())
.getEntity();

// Verify that the response contains the credentials refresh endpoint configuration
assertThat(response.config()).containsKey(AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT);

String expectedEndpoint =
String.format(
"v1/%s/namespaces/%s/tables/%s/credentials",
FEDERATED_CATALOG_NAME, namespace, tableName);
assertThat(response.config().get(AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT))
.isEqualTo(expectedEndpoint);
}
}
}