Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 42 additions & 41 deletions src/aws.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,46 +54,45 @@ static void InitAWSAPI() {
}
}

static void LogAWSRequest(ClientContext &context, std::shared_ptr<Aws::Http::HttpRequest> &req,
Aws::Http::HttpMethod &method) {
if (context.db) {
auto http_util = HTTPUtil::Get(*context.db);
auto aws_headers = req->GetHeaders();
auto http_headers = HTTPHeaders();
for (auto &header : aws_headers) {
http_headers.Insert(header.first.c_str(), header.second);
}
auto params = HTTPParams(http_util);
auto url = "https://" + req->GetUri().GetAuthority() + req->GetUri().GetPath();
const auto query_str = req->GetUri().GetQueryString();
if (!query_str.empty()) {
url += "?" + query_str;
}
RequestType type;
switch (method) {
case Aws::Http::HttpMethod::HTTP_GET:
type = RequestType::GET_REQUEST;
break;
case Aws::Http::HttpMethod::HTTP_HEAD:
type = RequestType::HEAD_REQUEST;
break;
case Aws::Http::HttpMethod::HTTP_DELETE:
type = RequestType::DELETE_REQUEST;
break;
case Aws::Http::HttpMethod::HTTP_POST:
type = RequestType::POST_REQUEST;
break;
case Aws::Http::HttpMethod::HTTP_PUT:
type = RequestType::PUT_REQUEST;
break;
default:
throw InvalidConfigurationException("Aws client cannot create request of type %s",
Aws::Http::HttpMethodMapper::GetNameForHttpMethod(method));
}
auto request = BaseRequest(type, url, http_headers, params);
request.params.logger = context.logger;
http_util.LogRequest(request, nullptr);
static void LogAWSHTTPRequest(ClientContext &context, std::shared_ptr<Aws::Http::HttpRequest> &req,
HTTPResponse &response, Aws::Http::HttpMethod &method) {
D_ASSERT(context.db);
auto http_util = HTTPUtil::Get(*context.db);
auto aws_headers = req->GetHeaders();
auto http_headers = HTTPHeaders();
for (auto &header : aws_headers) {
http_headers.Insert(header.first, header.second);
}
auto params = HTTPParams(http_util);
auto url = string("https://") + req->GetUri().GetAuthority() + req->GetUri().GetPath();
const auto query_str = req->GetUri().GetQueryString();
if (!query_str.empty()) {
url += "?" + query_str;
}
RequestType type;
switch (method) {
case Aws::Http::HttpMethod::HTTP_GET:
type = RequestType::GET_REQUEST;
break;
case Aws::Http::HttpMethod::HTTP_HEAD:
type = RequestType::HEAD_REQUEST;
break;
case Aws::Http::HttpMethod::HTTP_DELETE:
type = RequestType::DELETE_REQUEST;
break;
case Aws::Http::HttpMethod::HTTP_POST:
type = RequestType::POST_REQUEST;
break;
case Aws::Http::HttpMethod::HTTP_PUT:
type = RequestType::PUT_REQUEST;
break;
default:
throw InvalidConfigurationException("Aws client cannot create request of type %s",
Aws::Http::HttpMethodMapper::GetNameForHttpMethod(method));
}
auto request = BaseRequest(type, url, std::move(http_headers), params);
request.params.logger = context.logger;
http_util.LogRequest(request, response);
}

Aws::Client::ClientConfiguration AWSInput::BuildClientConfig() {
Expand Down Expand Up @@ -160,7 +159,6 @@ unique_ptr<HTTPResponse> AWSInput::ExecuteRequest(ClientContext &context, Aws::H
auto uri = BuildURI();
auto request = CreateSignedRequest(method, uri, headers, body);

LogAWSRequest(context, request, method);
auto httpClient = Aws::Http::CreateHttpClient(clientConfig);
auto response = httpClient->MakeRequest(request);
auto resCode = response->GetResponseCode();
Expand All @@ -180,13 +178,16 @@ unique_ptr<HTTPResponse> AWSInput::ExecuteRequest(ClientContext &context, Aws::H
result->reason = response->GetClientErrorMessage();
throw HTTPException(*result, result->reason);
}
for (auto &header : response->GetHeaders()) {
result->headers[header.first] = header.second;
}
Aws::StringStream resBody;
resBody << response->GetResponseBody().rdbuf();
result->body = resBody.str();

if (static_cast<uint16_t>(result->status) > 400) {
result->success = false;
}
LogAWSHTTPRequest(context, request, *result, method);
return result;
}

Expand Down
22 changes: 11 additions & 11 deletions src/catalog_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ bool IRCAPI::VerifySchemaExistence(ClientContext &context, IRCatalog &catalog, c
auto schema_name = GetEncodedSchemaName(namespace_items);

auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPathComponent(catalog.GetURLEncodedPrefix());
url_builder.AddPathComponent("namespaces");
url_builder.AddPathComponent(schema_name);
bool execute_head =
Expand All @@ -153,7 +153,7 @@ bool IRCAPI::VerifyTableExistence(ClientContext &context, IRCatalog &catalog, co
auto schema_name = GetEncodedSchemaName(schema.namespace_items);

auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPathComponent(catalog.GetURLEncodedPrefix());
url_builder.AddPathComponent("namespaces");
url_builder.AddPathComponent(schema_name);
url_builder.AddPathComponent("tables");
Expand All @@ -168,7 +168,7 @@ static unique_ptr<HTTPResponse> GetTableMetadata(ClientContext &context, IRCatal
auto schema_name = IRCAPI::GetEncodedSchemaName(schema.namespace_items);

auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPathComponent(catalog.GetURLEncodedPrefix());
url_builder.AddPathComponent("namespaces");
url_builder.AddPathComponent(schema_name);
url_builder.AddPathComponent("tables");
Expand Down Expand Up @@ -210,7 +210,7 @@ vector<rest_api_objects::TableIdentifier> IRCAPI::GetTables(ClientContext &conte

do {
auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPathComponent(catalog.GetURLEncodedPrefix());
url_builder.AddPathComponent("namespaces");
url_builder.AddPathComponent(schema_name);
url_builder.AddPathComponent("tables");
Expand Down Expand Up @@ -260,7 +260,7 @@ vector<IRCAPISchema> IRCAPI::GetSchemas(ClientContext &context, IRCatalog &catal
string page_token = "";
do {
auto endpoint_builder = catalog.GetBaseUrl();
endpoint_builder.AddPathComponent(catalog.prefix);
endpoint_builder.AddPathComponent(catalog.GetURLEncodedPrefix());
endpoint_builder.AddPathComponent("namespaces");
if (!parent.empty()) {
auto parent_name = GetSchemaName(parent);
Expand Down Expand Up @@ -316,7 +316,7 @@ vector<IRCAPISchema> IRCAPI::GetSchemas(ClientContext &context, IRCatalog &catal

void IRCAPI::CommitMultiTableUpdate(ClientContext &context, IRCatalog &catalog, const string &body) {
auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPathComponent(catalog.GetURLEncodedPrefix());
url_builder.AddPathComponent("transactions");
url_builder.AddPathComponent("commit");
HTTPHeaders headers(*context.db);
Expand All @@ -334,7 +334,7 @@ void IRCAPI::CommitTableUpdate(ClientContext &context, IRCatalog &catalog, const
auto schema_name = GetEncodedSchemaName(schema);

auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPathComponent(catalog.GetURLEncodedPrefix());
url_builder.AddPathComponent("namespaces");
url_builder.AddPathComponent(schema_name);
url_builder.AddPathComponent("tables");
Expand All @@ -353,7 +353,7 @@ void IRCAPI::CommitTableDelete(ClientContext &context, IRCatalog &catalog, const
const string &table_name) {
auto schema_name = GetEncodedSchemaName(schema);
auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPathComponent(catalog.GetURLEncodedPrefix());
url_builder.AddPathComponent("namespaces");
url_builder.AddPathComponent(schema_name);
url_builder.AddPathComponent("tables");
Expand All @@ -372,7 +372,7 @@ void IRCAPI::CommitTableDelete(ClientContext &context, IRCatalog &catalog, const

void IRCAPI::CommitNamespaceCreate(ClientContext &context, IRCatalog &catalog, string body) {
auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPathComponent(catalog.GetURLEncodedPrefix());
url_builder.AddPathComponent("namespaces");
HTTPHeaders headers(*context.db);
headers.Insert("Content-Type", "application/json");
Expand All @@ -387,7 +387,7 @@ void IRCAPI::CommitNamespaceCreate(ClientContext &context, IRCatalog &catalog, s
void IRCAPI::CommitNamespaceDrop(ClientContext &context, IRCatalog &catalog, vector<string> namespace_items) {
auto url_builder = catalog.GetBaseUrl();
auto schema_name = GetEncodedSchemaName(namespace_items);
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPathComponent(catalog.GetURLEncodedPrefix());
url_builder.AddPathComponent("namespaces");
url_builder.AddPathComponent(schema_name);

Expand All @@ -407,7 +407,7 @@ rest_api_objects::LoadTableResult IRCAPI::CommitNewTable(ClientContext &context,
auto &ic_schema = table->schema.Cast<IRCSchemaEntry>();
auto table_namespace = GetEncodedSchemaName(ic_schema.namespace_items);
auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPathComponent(catalog.GetURLEncodedPrefix());
url_builder.AddPathComponent("namespaces");
url_builder.AddPathComponent(table_namespace);
url_builder.AddPathComponent("tables");
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/irc_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class IRCatalog : public Catalog {
//! Whether or not this is an in-memory Iceberg database
bool InMemory() override;
string GetDBPath() override;
string GetURLEncodedPrefix();

static string GetOnlyMergeOnReadSupportedErrorMessage(const string &table_name, const string &property,
const string &property_value);
Expand Down
9 changes: 9 additions & 0 deletions src/storage/irc_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,15 @@ unique_ptr<Catalog> IRCatalog::Attach(optional_ptr<StorageExtensionInfo> storage
return std::move(catalog);
}

string IRCatalog::GetURLEncodedPrefix() {
// if auth handler is SigV4, we are sending the request through
// AWS, which will encode the prefix for us.
if (auth_handler->type == IRCAuthorizationType::SIGV4) {
return prefix;
}
return StringUtil::URLEncode(prefix);
}

string IRCatalog::GetOnlyMergeOnReadSupportedErrorMessage(const string &table_name, const string &property,
const string &property_value) {
return StringUtil::Format("DuckDB-Iceberg only supports merge-on-read for updates/deletes. Table Property '%s' is "
Expand Down
8 changes: 8 additions & 0 deletions test/sql/cloud/s3tables/test_logging_aws.test
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,11 @@ select request.url, request.type from duckdb_logs_parsed('HTTP') where request.t
https://s3tables.us-east-2.amazonaws.com/iceberg/v1/arn:aws:s3tables:us-east-2:840140254803:bucket/iceberg-testing/namespaces/tpch_sf1 GET
https://s3tables.us-east-2.amazonaws.com/iceberg/v1/arn:aws:s3tables:us-east-2:840140254803:bucket/iceberg-testing/namespaces/tpch_sf1/tables/region HEAD
https://s3tables.us-east-2.amazonaws.com/iceberg/v1/arn:aws:s3tables:us-east-2:840140254803:bucket/iceberg-testing/namespaces/tpch_sf1/tables/region GET

query I
select contains(map_keys(response.headers), 'x-amzn-requestid') from duckdb_logs_parsed('HTTP') where request.url like '%iceberg/v1%' order by timestamp;
----
1
1
1
1
17 changes: 16 additions & 1 deletion test/sql/local/irc/test_nessie.test
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ attach '' as my_datalake (
<REGEX>:.*Unrecognized access mode.*Supported options are 'vended_credentials' and 'none'.*

statement ok
attach '' as my_datalake (
attach 'warehouse' as my_datalake (
type ICEBERG,
ENDPOINT 'http://127.0.0.1:19120/iceberg/',
ACCESS_DELEGATION_MODE 'none',
Expand All @@ -65,6 +65,21 @@ insert into my_datalake.default.t1 select range a from range(10);
statement ok
create table if not exists my_datalake.default.t2 as select range a from range(1000);

query I
select * from my_datalake.default.t2 order by a limit 10;
----
0
1
2
3
4
5
6
7
8
9


statement ok
drop table if exists my_datalake.default.t2;

Expand Down
Loading