Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions src/Databases/DataLake/ICatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,19 @@ void TableMetadata::setLocation(const std::string & location_)
auto pos_to_path = location_.substr(pos_to_bucket).find('/');

if (pos_to_path == std::string::npos)
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);

pos_to_path = pos_to_bucket + pos_to_path;
{ // empty path
location_without_path = location_;
path.clear();
bucket = location_.substr(pos_to_bucket);
}
else
{
pos_to_path = pos_to_bucket + pos_to_path;

location_without_path = location_.substr(0, pos_to_path);
path = location_.substr(pos_to_path + 1);
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
location_without_path = location_.substr(0, pos_to_path);
path = location_.substr(pos_to_path + 1);
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
}

LOG_TEST(getLogger("TableMetadata"),
"Parsed location without path: {}, path: {}",
Expand Down
14 changes: 8 additions & 6 deletions src/IO/S3/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const
auto bucket_uri = getURIForBucket(bucket);
if (!bucket_uri)
{
if (auto maybe_error = updateURIForBucketForHead(bucket); maybe_error.has_value())
if (auto maybe_error = updateURIForBucketForHead(bucket, request.GetKey()); maybe_error.has_value())
return *maybe_error;

if (auto region = getRegionForBucket(bucket); !region.empty())
Expand Down Expand Up @@ -584,7 +584,6 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const
if (auto uri = getURIForBucket(bucket); uri.has_value())
request.overrideURI(std::move(*uri));


bool found_new_endpoint = false;
// if we found correct endpoint after 301 responses, update the cache for future requests
SCOPE_EXIT(
Expand Down Expand Up @@ -864,12 +863,15 @@ std::optional<S3::URI> Client::getURIFromError(const Aws::S3::S3Error & error) c
}

// Do a list request because head requests don't have body in response
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket) const
// S3 Tables don't support ListObjects, so made dirty workaroung - changed on GetObject
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket, const std::string & key) const
{
ListObjectsV2Request req;
GetObjectRequest req;
req.SetBucket(bucket);
req.SetMaxKeys(1);
auto result = ListObjectsV2(req);
req.SetKey(key);
req.SetRange("bytes=0-1");
auto result = GetObject(req);

if (result.IsSuccess())
return std::nullopt;
return result.GetError();
Expand Down
2 changes: 1 addition & 1 deletion src/IO/S3/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class Client : private Aws::S3::S3Client

void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const;
std::optional<S3::URI> getURIFromError(const Aws::S3::S3Error & error) const;
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket) const;
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket, const std::string & key) const;

std::optional<S3::URI> getURIForBucket(const std::string & bucket) const;

Expand Down
62 changes: 62 additions & 0 deletions src/IO/S3/URI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,72 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
validateKey(key, uri);
}

bool URI::isAWSRegion(std::string_view region)
{
/// List from https://docs.aws.amazon.com/general/latest/gr/s3.html
static const std::unordered_set<std::string_view> regions = {
"us-east-2",
"us-east-1",
"us-west-1",
"us-west-2",
"af-south-1",
"ap-east-1",
"ap-south-2",
"ap-southeast-3",
"ap-southeast-5",
"ap-southeast-4",
"ap-south-1",
"ap-northeast-3",
"ap-northeast-2",
"ap-southeast-1",
"ap-southeast-2",
"ap-east-2",
"ap-southeast-7",
"ap-northeast-1",
"ca-central-1",
"ca-west-1",
"eu-central-1",
"eu-west-1",
"eu-west-2",
"eu-south-1",
"eu-west-3",
"eu-south-2",
"eu-north-1",
"eu-central-2",
"il-central-1",
"mx-central-1",
"me-south-1",
"me-central-1",
"sa-east-1",
"us-gov-east-1",
"us-gov-west-1"
};

/// 's3-us-west-2' is a legacy region format for S3 storage, equals to 'us-west-2'
/// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#VirtualHostingBackwardsCompatibility
if (region.substr(0, 3) == "s3-")
region = region.substr(3);

return regions.contains(region);
}

void URI::addRegionToURI(const std::string &region)
{
if (auto pos = endpoint.find(".amazonaws.com"); pos != std::string::npos)
{
if (pos > 0)
{ /// Check if region is already in endpoint to avoid add it second time
auto prev_pos = endpoint.find_last_of("/.", pos - 1);
if (prev_pos == std::string::npos)
prev_pos = 0;
else
++prev_pos;
std::string_view endpoint_region = std::string_view(endpoint).substr(prev_pos, pos - prev_pos);
if (isAWSRegion(endpoint_region))
return;
}
endpoint = endpoint.substr(0, pos) + "." + region + endpoint.substr(pos);
}
}

void URI::validateBucket(const String & bucket, const Poco::URI & uri)
Expand Down
4 changes: 4 additions & 0 deletions src/IO/S3/URI.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ struct URI
static void validateBucket(const std::string & bucket, const Poco::URI & uri);
static void validateKey(const std::string & key, const Poco::URI & uri);

/// Returns true if 'region' string is an AWS S3 region
/// https://docs.aws.amazon.com/general/latest/gr/s3.html
static bool isAWSRegion(std::string_view region);

private:
std::pair<std::string, std::optional<std::string>> getURIAndArchivePattern(const std::string & source);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ void IcebergMetadata::updateSnapshot(Poco::JSON::Object::Ptr metadata_object)

relevant_snapshot = IcebergSnapshot{
getManifestList(getProperFilePathFromMetadataInfo(
snapshot->getValue<String>(f_manifest_list), configuration_ptr->getPath(), table_location)),
snapshot->getValue<String>(f_manifest_list), configuration_ptr->getPath(), table_location, configuration_ptr->getNamespace())),
relevant_snapshot_id, total_rows, total_bytes};

if (!snapshot->has(f_schema_id))
Expand Down Expand Up @@ -678,7 +678,7 @@ ManifestFileCacheKeys IcebergMetadata::getManifestList(const String & filename)
for (size_t i = 0; i < manifest_list_deserializer.rows(); ++i)
{
const std::string file_path = manifest_list_deserializer.getValueFromRowByName(i, f_manifest_path, TypeIndex::String).safeGet<std::string>();
const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPath(), table_location);
const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPath(), table_location, configuration_ptr->getNamespace());
Int64 added_sequence_number = 0;
if (format_version > 1)
added_sequence_number = manifest_list_deserializer.getValueFromRowByName(i, f_sequence_number, TypeIndex::Int64).safeGet<Int64>();
Expand Down Expand Up @@ -805,6 +805,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(const String & filename, Int64
schema_processor,
inherited_sequence_number,
table_location,
configuration_ptr->getNamespace(),
getContext());
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ ManifestFileContent::ManifestFileContent(
const IcebergSchemaProcessor & schema_processor,
Int64 inherited_sequence_number,
const String & table_location,
const String & common_namespace,
DB::ContextPtr context)
{
this->schema_id = schema_id_;
Expand Down Expand Up @@ -191,7 +192,11 @@ ManifestFileContent::ManifestFileContent(
}
const auto status = ManifestEntryStatus(manifest_file_deserializer.getValueFromRowByName(i, f_status, TypeIndex::Int32).safeGet<UInt64>());

const auto file_path = getProperFilePathFromMetadataInfo(manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet<String>(), common_path, table_location);
const auto file_path = getProperFilePathFromMetadataInfo(
manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet<String>(),
common_path,
table_location,
common_namespace);

/// NOTE: This is weird, because in manifest file partition looks like this:
/// {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class ManifestFileContent
const DB::IcebergSchemaProcessor & schema_processor,
Int64 inherited_sequence_number,
const std::string & table_location,
const std::string & common_namespace,
DB::ContextPtr context);

const std::vector<ManifestFileEntry> & getFiles() const;
Expand Down
21 changes: 19 additions & 2 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ using namespace DB;
// This function is used to get the file path inside the directory which corresponds to iceberg table from the full blob path which is written in manifest and metadata files.
// For example, if the full blob path is s3://bucket/table_name/data/00000-1-1234567890.avro, the function will return table_name/data/00000-1-1234567890.avro
// Common path should end with "<table_name>" or "<table_name>/".
std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location)
std::string getProperFilePathFromMetadataInfo(
std::string_view data_path,
std::string_view common_path,
std::string_view table_location,
std::string_view common_namespace)
{
auto trim_backward_slash = [](std::string_view str) -> std::string_view
{
Expand Down Expand Up @@ -84,7 +88,20 @@ std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::s
}
else
{
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Expected to find '{}' in data path: '{}'", common_path, data_path);
/// Data files can have different path
pos = data_path.find("://");
if (pos == std::string::npos)
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path);
pos = data_path.find("/", pos + 3);
if (pos == std::string::npos)
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path);
if (data_path.substr(pos + 1).starts_with(common_namespace))
{
auto new_pos = data_path.find("/", pos + 1);
if (new_pos - pos == common_namespace.length() + 1) /// bucket in the path
pos = new_pos;
}
return std::string(data_path.substr(pos));
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
namespace Iceberg
{

std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location);
std::string getProperFilePathFromMetadataInfo(
std::string_view data_path,
std::string_view common_path,
std::string_view table_location,
std::string_view common_namespace);

}

Expand Down
Loading