From 73ed249464594bef78f73cab02a994563374aedd Mon Sep 17 00:00:00 2001 From: Shubham Kaushal Date: Wed, 23 Jul 2025 09:40:07 +0000 Subject: [PATCH 1/6] Support read_all in the async client --- google/cloud/storage/async/client.cc | 17 +++ google/cloud/storage/async/client.h | 41 +++++++ google/cloud/storage/async/client_test.cc | 100 ++++++++++++++++++ .../storage/examples/storage_async_samples.cc | 8 +- 4 files changed, 162 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/async/client.cc b/google/cloud/storage/async/client.cc index 29b875d7c0f90..9b6aa48a7e4cd 100644 --- a/google/cloud/storage/async/client.cc +++ b/google/cloud/storage/async/client.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/storage/async/client.h" +#include "google/cloud/storage/async/read_all.h" #include "google/cloud/storage/internal/async/connection_impl.h" #include "google/cloud/storage/internal/async/connection_tracing.h" #include "google/cloud/storage/internal/async/default_options.h" @@ -112,6 +113,22 @@ future> AsyncClient::ReadObjectRange( internal::MergeOptions(std::move(opts), connection_->options())}); } +future> AsyncClient::ReadAll( + BucketName const& bucket_name, std::string object_name, Options opts) { + auto request = google::storage::v2::ReadObjectRequest{}; + request.set_bucket(bucket_name.FullName()); + request.set_object(std::move(object_name)); + return ReadAll(std::move(request), std::move(opts)); +} + +future> AsyncClient::ReadAll( + google::storage::v2::ReadObjectRequest request, Options opts) { + request.clear_read_offset(); + request.clear_read_limit(); + return storage_experimental::ReadAll( + ReadObject(std::move(request), std::move(opts))); +} + future>> AsyncClient::StartAppendableObjectUpload(BucketName const& bucket_name, std::string object_name, diff --git a/google/cloud/storage/async/client.h b/google/cloud/storage/async/client.h index ac544804a7c07..a66015cfc32e1 100644 --- a/google/cloud/storage/async/client.h +++ b/google/cloud/storage/async/client.h @@ -370,6 +370,47 @@ class AsyncClient { google::storage::v2::ReadObjectRequest request, std::int64_t offset, std::int64_t limit, Options opts = {}); + /** + * Reads the full contents of an object. + * + * When satisfied, the returned future has the full contents of the given + * object. + * + * Be aware that this will accumulate all the bytes in memory. For large + * objects, this function may fail with `StatusCode::kResourceExhausted` if + * the system runs out of memory. If you need to process large objects, + * consider using `ReadObject()` instead. + * + * @par Example + * @snippet storage_async_samples.cc read-all + * + * @par Idempotency + * This is a read-only operation and is always idempotent. Once the download + * starts, this operation will automatically resume the download if is + * interrupted. Use `ResumePolicyOption` and `ResumePolicy` to control this. + * + * @param bucket_name the name of the bucket that contains the object. + * @param object_name the name of the object to be read. + * @param opts options controlling the behavior of this RPC, for example + * the application may change the retry policy. + */ + future> ReadAll(BucketName const& bucket_name, + std::string object_name, + Options opts = {}); + + /** + * @copydoc ReadAll(BucketName const&, std::string, Options) + * + * @param request the request contents, it must include the bucket name and + * object names. Many other fields are optional. Any values for + * `read_offset()` and `read_limit()` are ignored. To read a range of + * the object use `ReadObjectRange()`. + * @param opts options controlling the behavior of this RPC, for example + * the application may change the retry policy. + */ + future> ReadAll( + google::storage::v2::ReadObjectRequest request, Options opts = {}); + /* [start-appendable-object-upload] Initiates a [resumable upload][resumable-link] for an appendable object. diff --git a/google/cloud/storage/async/client_test.cc b/google/cloud/storage/async/client_test.cc index 5ad482ad1bed8..af54017a28f23 100644 --- a/google/cloud/storage/async/client_test.cc +++ b/google/cloud/storage/async/client_test.cc @@ -379,6 +379,106 @@ TEST(AsyncClient, ReadObject2) { "empty response", [](auto const& p) { return p.size(); }, 0))); } +TEST(AsyncClient, ReadAll1) { + auto constexpr kExpectedRequest = R"pb( + bucket: "projects/_/buckets/test-bucket" + object: "test-object" + )pb"; + auto mock = std::make_shared(); + EXPECT_CALL(*mock, options) + .WillRepeatedly( + Return(Options{}.set>("O0").set>("O1"))); + + EXPECT_CALL(*mock, ReadObject) + .WillOnce([&](AsyncConnection::ReadObjectParams const& p) { + EXPECT_THAT(p.options.get>(), "O0"); + EXPECT_THAT(p.options.get>(), "O1-function"); + EXPECT_THAT(p.options.get>(), "O2-function"); + auto expected = google::storage::v2::ReadObjectRequest{}; + EXPECT_TRUE(TextFormat::ParseFromString(kExpectedRequest, &expected)); + EXPECT_THAT(p.request, IsProtoEqual(expected)); + auto reader = std::make_unique(); + EXPECT_CALL(*reader, Read) + .WillOnce([]() { + return make_ready_future( + AsyncReaderConnection::ReadResponse{ReadPayload("test-")}); + }) + .WillOnce([]() { + return make_ready_future( + AsyncReaderConnection::ReadResponse{ReadPayload("payload")}); + }) + .WillOnce([]() { + return make_ready_future( + AsyncReaderConnection::ReadResponse{Status{}}); + }); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(reader)))); + }); + + auto client = AsyncClient(mock); + auto payload = client + .ReadAll(BucketName("test-bucket"), "test-object", + Options{} + .set>("O1-function") + .set>("O2-function")) + .get(); + ASSERT_STATUS_OK(payload); + EXPECT_THAT(payload->contents(), ElementsAre("test-", "payload")); +} + +TEST(AsyncClient, ReadAll2) { + auto constexpr kOriginalRequest = R"pb( + bucket: "test-only-invalid" + object: "test-object" + generation: 42 + read_offset: 123 + read_limit: 456 + )pb"; + auto constexpr kExpectedRequest = + R"pb( + bucket: "test-only-invalid" object: "test-object" generation: 42 + )pb"; + auto mock = std::make_shared(); + EXPECT_CALL(*mock, options) + .WillRepeatedly( + Return(Options{}.set>("O0").set>("O1"))); + + EXPECT_CALL(*mock, ReadObject) + .WillOnce([&](AsyncConnection::ReadObjectParams const& p) { + EXPECT_THAT(p.options.get>(), "O0"); + EXPECT_THAT(p.options.get>(), "O1-function"); + EXPECT_THAT(p.options.get>(), "O2-function"); + auto expected = google::storage::v2::ReadObjectRequest{}; + EXPECT_TRUE(TextFormat::ParseFromString(kExpectedRequest, &expected)); + EXPECT_THAT(p.request, IsProtoEqual(expected)); + + auto reader = std::make_unique(); + EXPECT_CALL(*reader, Read) + .WillOnce([] { + return make_ready_future( + AsyncReaderConnection::ReadResponse(ReadPayload("payload"))); + }) + .WillOnce([] { + return make_ready_future( + AsyncReaderConnection::ReadResponse(Status{})); + }); + return make_ready_future(make_status_or( + std::unique_ptr(std::move(reader)))); + }); + + auto client = AsyncClient(mock); + auto request = google::storage::v2::ReadObjectRequest{}; + EXPECT_TRUE(TextFormat::ParseFromString(kOriginalRequest, &request)); + auto payload = + client + .ReadAll(std::move(request), Options{} + .set>("O1-function") + .set>("O2-function")) + .get(); + ASSERT_STATUS_OK(payload); + EXPECT_THAT(payload->contents(), ElementsAre("payload")); +} + TEST(AsyncClient, StartAppendableObjectUpload1) { auto constexpr kExpectedRequest = R"pb( write_object_spec { diff --git a/google/cloud/storage/examples/storage_async_samples.cc b/google/cloud/storage/examples/storage_async_samples.cc index 3b690dc0ff46a..4ae6663d24482 100644 --- a/google/cloud/storage/examples/storage_async_samples.cc +++ b/google/cloud/storage/examples/storage_async_samples.cc @@ -235,10 +235,10 @@ void ReadAll(google::cloud::storage_experimental::AsyncClient& client, std::string object_name) -> google::cloud::future { // For small objects, consider `ReadAll()` which accumulates all the // contents in memory using background threads. - auto payload = (co_await gcs_ex::ReadAll(client.ReadObject( - gcs_ex::BucketName(std::move(bucket_name)), - std::move(object_name)))) - .value(); + auto payload = + (co_await client.ReadAll(gcs_ex::BucketName(std::move(bucket_name)), + std::move(object_name))) + .value(); std::uint64_t count = 0; for (auto const& buffer : payload.contents()) { count += std::count(buffer.begin(), buffer.end(), '\n'); From 2ce085670d2b82e2e3cdf50235c52b096bac9b63 Mon Sep 17 00:00:00 2001 From: Shubham Kaushal Date: Thu, 24 Jul 2025 06:12:16 +0000 Subject: [PATCH 2/6] fixing code comments --- google/cloud/storage/async/client.h | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/google/cloud/storage/async/client.h b/google/cloud/storage/async/client.h index a66015cfc32e1..a0f01ce83b0c9 100644 --- a/google/cloud/storage/async/client.h +++ b/google/cloud/storage/async/client.h @@ -399,7 +399,23 @@ class AsyncClient { Options opts = {}); /** - * @copydoc ReadAll(BucketName const&, std::string, Options) + * Reads the full contents of an object. + * + * When satisfied, the returned future has the full contents of the given + * object. + * + * Be aware that this will accumulate all the bytes in memory. For large + * objects, this function may fail with `StatusCode::kResourceExhausted` if + * the system runs out of memory. If you need to process large objects, + * consider using `ReadObject()` instead. + * + * @par Example + * @snippet storage_async_samples.cc read-all + * + * @par Idempotency + * This is a read-only operation and is always idempotent. Once the download + * starts, this operation will automatically resume the download if is + * interrupted. Use `ResumePolicyOption` and `ResumePolicy` to control this. * * @param request the request contents, it must include the bucket name and * object names. Many other fields are optional. Any values for From 6933c6f4a6755d8e29f7c8a508ba965152055786 Mon Sep 17 00:00:00 2001 From: Shubham Kaushal Date: Wed, 13 Aug 2025 12:10:41 +0000 Subject: [PATCH 3/6] adding read_all in async client --- google/cloud/storage/async/client.cc | 14 ++++- google/cloud/storage/async/client.h | 19 +++++++ google/cloud/storage/async/client_test.cc | 67 +++++++++++++---------- 3 files changed, 69 insertions(+), 31 deletions(-) diff --git a/google/cloud/storage/async/client.cc b/google/cloud/storage/async/client.cc index 9b6aa48a7e4cd..0306ce42c8e9a 100644 --- a/google/cloud/storage/async/client.cc +++ b/google/cloud/storage/async/client.cc @@ -125,8 +125,18 @@ future> AsyncClient::ReadAll( google::storage::v2::ReadObjectRequest request, Options opts) { request.clear_read_offset(); request.clear_read_limit(); - return storage_experimental::ReadAll( - ReadObject(std::move(request), std::move(opts))); + auto reader_future = ReadObject(std::move(request), std::move(opts)); + return reader_future.then( + [this](future>> f) { + auto r = f.get(); + if (!r) return make_ready_future(StatusOr(r.status())); + return ReadAll(std::move(r->first), std::move(r->second)); + }); +} + +future> AsyncClient::ReadAll(AsyncReader reader, + AsyncToken token) { + return storage_experimental::ReadAll(std::move(reader), std::move(token)); } future>> diff --git a/google/cloud/storage/async/client.h b/google/cloud/storage/async/client.h index a0f01ce83b0c9..1971935a2b08c 100644 --- a/google/cloud/storage/async/client.h +++ b/google/cloud/storage/async/client.h @@ -427,6 +427,25 @@ class AsyncClient { future> ReadAll( google::storage::v2::ReadObjectRequest request, Options opts = {}); + /** + * Reads the full contents of an object from an `AsyncReader`. + * + * This function consumes the reader and token to read all the data from the + * underlying stream and accumulates it in memory. + * + * Be aware that this will accumulate all the bytes in memory. For large + * objects, this function may fail with `StatusCode::kResourceExhausted` if + * the system runs out of memory. + * + * @par Idempotency + * This operation will automatically resume the download if it is + * interrupted. Use `ResumePolicyOption` and `ResumePolicy` to control this. + * + * @param reader The asynchronous reader to consume. + * @param token The token to start reading. + */ + future> ReadAll(AsyncReader reader, AsyncToken token); + /* [start-appendable-object-upload] Initiates a [resumable upload][resumable-link] for an appendable object. diff --git a/google/cloud/storage/async/client_test.cc b/google/cloud/storage/async/client_test.cc index af54017a28f23..3568827bfadd6 100644 --- a/google/cloud/storage/async/client_test.cc +++ b/google/cloud/storage/async/client_test.cc @@ -53,6 +53,21 @@ struct TestOption { using Type = std::string; }; +auto MakeTestReaderConnection(std::vector chunks) { + auto reader_impl = std::make_unique(); + ::testing::InSequence seq; + for (auto& chunk : chunks) { + EXPECT_CALL(*reader_impl, Read).WillOnce([c = std::move(chunk)] { + return make_ready_future( + AsyncReaderConnection::ReadResponse{ReadPayload(std::move(c))}); + }); + } + EXPECT_CALL(*reader_impl, Read).WillOnce([] { + return make_ready_future(AsyncReaderConnection::ReadResponse{Status{}}); + }); + return reader_impl; +} + auto TestProtoObject() { google::storage::v2::Object result; result.set_bucket("projects/_/buckets/test-bucket"); @@ -397,22 +412,9 @@ TEST(AsyncClient, ReadAll1) { auto expected = google::storage::v2::ReadObjectRequest{}; EXPECT_TRUE(TextFormat::ParseFromString(kExpectedRequest, &expected)); EXPECT_THAT(p.request, IsProtoEqual(expected)); - auto reader = std::make_unique(); - EXPECT_CALL(*reader, Read) - .WillOnce([]() { - return make_ready_future( - AsyncReaderConnection::ReadResponse{ReadPayload("test-")}); - }) - .WillOnce([]() { - return make_ready_future( - AsyncReaderConnection::ReadResponse{ReadPayload("payload")}); - }) - .WillOnce([]() { - return make_ready_future( - AsyncReaderConnection::ReadResponse{Status{}}); - }); - return make_ready_future(make_status_or( - std::unique_ptr(std::move(reader)))); + return make_ready_future( + make_status_or(std::unique_ptr( + MakeTestReaderConnection({"test-", "payload"})))); }); auto client = AsyncClient(mock); @@ -451,19 +453,9 @@ TEST(AsyncClient, ReadAll2) { auto expected = google::storage::v2::ReadObjectRequest{}; EXPECT_TRUE(TextFormat::ParseFromString(kExpectedRequest, &expected)); EXPECT_THAT(p.request, IsProtoEqual(expected)); - - auto reader = std::make_unique(); - EXPECT_CALL(*reader, Read) - .WillOnce([] { - return make_ready_future( - AsyncReaderConnection::ReadResponse(ReadPayload("payload"))); - }) - .WillOnce([] { - return make_ready_future( - AsyncReaderConnection::ReadResponse(Status{})); - }); - return make_ready_future(make_status_or( - std::unique_ptr(std::move(reader)))); + return make_ready_future( + make_status_or(std::unique_ptr( + MakeTestReaderConnection({"payload"})))); }); auto client = AsyncClient(mock); @@ -479,6 +471,23 @@ TEST(AsyncClient, ReadAll2) { EXPECT_THAT(payload->contents(), ElementsAre("payload")); } +TEST(AsyncClient, ReadAllFromReader) { + auto mock = std::make_shared(); + EXPECT_CALL(*mock, options) + .WillRepeatedly( + Return(Options{}.set>("O0").set>("O1"))); + + auto reader_impl = MakeTestReaderConnection({"test-", "payload"}); + auto* reader_impl_ptr = reader_impl.get(); + auto reader = AsyncReader(std::move(reader_impl)); + auto token = storage_internal::MakeAsyncToken(reader_impl_ptr); + + auto client = AsyncClient(mock); + auto payload = client.ReadAll(std::move(reader), std::move(token)).get(); + ASSERT_STATUS_OK(payload); + EXPECT_THAT(payload->contents(), ElementsAre("test-", "payload")); +} + TEST(AsyncClient, StartAppendableObjectUpload1) { auto constexpr kExpectedRequest = R"pb( write_object_spec { From 73ea303f11fe013f4a6dbbf48bb57d4624ed10b4 Mon Sep 17 00:00:00 2001 From: Shubham Kaushal Date: Wed, 13 Aug 2025 12:28:48 +0000 Subject: [PATCH 4/6] fixing presubmits --- google/cloud/storage/async/client.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/google/cloud/storage/async/client.h b/google/cloud/storage/async/client.h index 1971935a2b08c..199f3cd69c280 100644 --- a/google/cloud/storage/async/client.h +++ b/google/cloud/storage/async/client.h @@ -444,7 +444,8 @@ class AsyncClient { * @param reader The asynchronous reader to consume. * @param token The token to start reading. */ - future> ReadAll(AsyncReader reader, AsyncToken token); + static future> ReadAll(AsyncReader reader, + AsyncToken token); /* [start-appendable-object-upload] From a487594d8a4a77744eb27bf121ef40d7f588134a Mon Sep 17 00:00:00 2001 From: Shubham Kaushal Date: Wed, 13 Aug 2025 13:24:24 +0000 Subject: [PATCH 5/6] fixing presubmits --- google/cloud/storage/async/client.cc | 2 +- google/cloud/storage/async/client_test.cc | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/async/client.cc b/google/cloud/storage/async/client.cc index 0306ce42c8e9a..c1ac7052d0c13 100644 --- a/google/cloud/storage/async/client.cc +++ b/google/cloud/storage/async/client.cc @@ -127,7 +127,7 @@ future> AsyncClient::ReadAll( request.clear_read_limit(); auto reader_future = ReadObject(std::move(request), std::move(opts)); return reader_future.then( - [this](future>> f) { + [](future>> f) { auto r = f.get(); if (!r) return make_ready_future(StatusOr(r.status())); return ReadAll(std::move(r->first), std::move(r->second)); diff --git a/google/cloud/storage/async/client_test.cc b/google/cloud/storage/async/client_test.cc index 3568827bfadd6..e687a8121edbd 100644 --- a/google/cloud/storage/async/client_test.cc +++ b/google/cloud/storage/async/client_test.cc @@ -483,7 +483,8 @@ TEST(AsyncClient, ReadAllFromReader) { auto token = storage_internal::MakeAsyncToken(reader_impl_ptr); auto client = AsyncClient(mock); - auto payload = client.ReadAll(std::move(reader), std::move(token)).get(); + auto payload = + AsyncClient::ReadAll(std::move(reader), std::move(token)).get(); ASSERT_STATUS_OK(payload); EXPECT_THAT(payload->contents(), ElementsAre("test-", "payload")); } From 7fd64bc249a9cd66d9791ba8e66936e873b7606a Mon Sep 17 00:00:00 2001 From: Shubham Kaushal Date: Thu, 14 Aug 2025 03:51:29 +0000 Subject: [PATCH 6/6] adding tests --- google/cloud/storage/async/client_test.cc | 30 +++++++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/google/cloud/storage/async/client_test.cc b/google/cloud/storage/async/client_test.cc index e687a8121edbd..8d2b49a77726c 100644 --- a/google/cloud/storage/async/client_test.cc +++ b/google/cloud/storage/async/client_test.cc @@ -41,6 +41,7 @@ using ::google::cloud::storage_mocks::MockAsyncWriterConnection; using ::google::cloud::testing_util::IsOk; using ::google::cloud::testing_util::IsOkAndHolds; using ::google::cloud::testing_util::IsProtoEqual; +using ::google::cloud::testing_util::StatusIs; using ::google::protobuf::TextFormat; using ::testing::ElementsAre; using ::testing::Optional; @@ -471,18 +472,37 @@ TEST(AsyncClient, ReadAll2) { EXPECT_THAT(payload->contents(), ElementsAre("payload")); } -TEST(AsyncClient, ReadAllFromReader) { +TEST(AsyncClient, ReadAllRequestFailure) { + auto constexpr kRequestText = R"pb( + bucket: "projects/_/buckets/test-bucket" + object: "test-object" + )pb"; auto mock = std::make_shared(); - EXPECT_CALL(*mock, options) - .WillRepeatedly( - Return(Options{}.set>("O0").set>("O1"))); + EXPECT_CALL(*mock, options).WillRepeatedly(Return(Options{})); + EXPECT_CALL(*mock, ReadObject) + .WillOnce([&](AsyncConnection::ReadObjectParams const& p) { + auto expected = google::storage::v2::ReadObjectRequest{}; + EXPECT_TRUE(TextFormat::ParseFromString(kRequestText, &expected)); + EXPECT_THAT(p.request, IsProtoEqual(expected)); + return make_ready_future( + StatusOr>( + Status(StatusCode::kPermissionDenied, "uh-oh"))); + }); + + auto client = AsyncClient(mock); + auto request = google::storage::v2::ReadObjectRequest{}; + EXPECT_TRUE(TextFormat::ParseFromString(kRequestText, &request)); + auto payload = client.ReadAll(std::move(request)).get(); + EXPECT_THAT(payload, StatusIs(StatusCode::kPermissionDenied, "uh-oh")); +} + +TEST(AsyncClient, ReadAllFromReader) { auto reader_impl = MakeTestReaderConnection({"test-", "payload"}); auto* reader_impl_ptr = reader_impl.get(); auto reader = AsyncReader(std::move(reader_impl)); auto token = storage_internal::MakeAsyncToken(reader_impl_ptr); - auto client = AsyncClient(mock); auto payload = AsyncClient::ReadAll(std::move(reader), std::move(token)).get(); ASSERT_STATUS_OK(payload);