Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions google/cloud/storage/async/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "google/cloud/storage/async/client.h"
#include "google/cloud/storage/async/read_all.h"
#include "google/cloud/storage/internal/async/connection_impl.h"
#include "google/cloud/storage/internal/async/connection_tracing.h"
#include "google/cloud/storage/internal/async/default_options.h"
Expand Down Expand Up @@ -112,6 +113,32 @@ future<StatusOr<ReadPayload>> AsyncClient::ReadObjectRange(
internal::MergeOptions(std::move(opts), connection_->options())});
}

future<StatusOr<ReadPayload>> AsyncClient::ReadAll(
BucketName const& bucket_name, std::string object_name, Options opts) {
auto request = google::storage::v2::ReadObjectRequest{};
request.set_bucket(bucket_name.FullName());
request.set_object(std::move(object_name));
return ReadAll(std::move(request), std::move(opts));
}

future<StatusOr<ReadPayload>> AsyncClient::ReadAll(
google::storage::v2::ReadObjectRequest request, Options opts) {
request.clear_read_offset();
request.clear_read_limit();
auto reader_future = ReadObject(std::move(request), std::move(opts));
return reader_future.then(
[](future<StatusOr<std::pair<AsyncReader, AsyncToken>>> f) {
auto r = f.get();
if (!r) return make_ready_future(StatusOr<ReadPayload>(r.status()));
return ReadAll(std::move(r->first), std::move(r->second));
});
}

future<StatusOr<ReadPayload>> AsyncClient::ReadAll(AsyncReader reader,
AsyncToken token) {
return storage_experimental::ReadAll(std::move(reader), std::move(token));
}

future<StatusOr<std::pair<AsyncWriter, AsyncToken>>>
AsyncClient::StartAppendableObjectUpload(BucketName const& bucket_name,
std::string object_name,
Expand Down
77 changes: 77 additions & 0 deletions google/cloud/storage/async/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,83 @@ class AsyncClient {
google::storage::v2::ReadObjectRequest request, std::int64_t offset,
std::int64_t limit, Options opts = {});

/**
* Reads the full contents of an object.
*
* When satisfied, the returned future has the full contents of the given
* object.
*
* Be aware that this will accumulate all the bytes in memory. For large
* objects, this function may fail with `StatusCode::kResourceExhausted` if
* the system runs out of memory. If you need to process large objects,
* consider using `ReadObject()` instead.
*
* @par Example
* @snippet storage_async_samples.cc read-all
*
* @par Idempotency
* This is a read-only operation and is always idempotent. Once the download
* starts, this operation will automatically resume the download if is
* interrupted. Use `ResumePolicyOption` and `ResumePolicy` to control this.
*
* @param bucket_name the name of the bucket that contains the object.
* @param object_name the name of the object to be read.
* @param opts options controlling the behavior of this RPC, for example
* the application may change the retry policy.
*/
future<StatusOr<ReadPayload>> ReadAll(BucketName const& bucket_name,
std::string object_name,
Options opts = {});

/**
* Reads the full contents of an object.
*
* When satisfied, the returned future has the full contents of the given
* object.
*
* Be aware that this will accumulate all the bytes in memory. For large
* objects, this function may fail with `StatusCode::kResourceExhausted` if
* the system runs out of memory. If you need to process large objects,
* consider using `ReadObject()` instead.
*
* @par Example
* @snippet storage_async_samples.cc read-all
*
* @par Idempotency
* This is a read-only operation and is always idempotent. Once the download
* starts, this operation will automatically resume the download if is
* interrupted. Use `ResumePolicyOption` and `ResumePolicy` to control this.
*
* @param request the request contents, it must include the bucket name and
* object names. Many other fields are optional. Any values for
* `read_offset()` and `read_limit()` are ignored. To read a range of
* the object use `ReadObjectRange()`.
* @param opts options controlling the behavior of this RPC, for example
* the application may change the retry policy.
*/
future<StatusOr<ReadPayload>> ReadAll(
google::storage::v2::ReadObjectRequest request, Options opts = {});

/**
* Reads the full contents of an object from an `AsyncReader`.
*
* This function consumes the reader and token to read all the data from the
* underlying stream and accumulates it in memory.
*
* Be aware that this will accumulate all the bytes in memory. For large
* objects, this function may fail with `StatusCode::kResourceExhausted` if
* the system runs out of memory.
*
* @par Idempotency
* This operation will automatically resume the download if it is
* interrupted. Use `ResumePolicyOption` and `ResumePolicy` to control this.
*
* @param reader The asynchronous reader to consume.
* @param token The token to start reading.
*/
static future<StatusOr<ReadPayload>> ReadAll(AsyncReader reader,
AsyncToken token);

/*
[start-appendable-object-upload]
Initiates a [resumable upload][resumable-link] for an appendable object.
Expand Down
130 changes: 130 additions & 0 deletions google/cloud/storage/async/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ using ::google::cloud::storage_mocks::MockAsyncWriterConnection;
using ::google::cloud::testing_util::IsOk;
using ::google::cloud::testing_util::IsOkAndHolds;
using ::google::cloud::testing_util::IsProtoEqual;
using ::google::cloud::testing_util::StatusIs;
using ::google::protobuf::TextFormat;
using ::testing::ElementsAre;
using ::testing::Optional;
Expand All @@ -53,6 +54,21 @@ struct TestOption {
using Type = std::string;
};

auto MakeTestReaderConnection(std::vector<std::string> chunks) {
auto reader_impl = std::make_unique<MockAsyncReaderConnection>();
::testing::InSequence seq;
for (auto& chunk : chunks) {
EXPECT_CALL(*reader_impl, Read).WillOnce([c = std::move(chunk)] {
return make_ready_future(
AsyncReaderConnection::ReadResponse{ReadPayload(std::move(c))});
});
}
EXPECT_CALL(*reader_impl, Read).WillOnce([] {
return make_ready_future(AsyncReaderConnection::ReadResponse{Status{}});
});
return reader_impl;
}

auto TestProtoObject() {
google::storage::v2::Object result;
result.set_bucket("projects/_/buckets/test-bucket");
Expand Down Expand Up @@ -379,6 +395,120 @@ TEST(AsyncClient, ReadObject2) {
"empty response", [](auto const& p) { return p.size(); }, 0)));
}

TEST(AsyncClient, ReadAll1) {
auto constexpr kExpectedRequest = R"pb(
bucket: "projects/_/buckets/test-bucket"
object: "test-object"
)pb";
auto mock = std::make_shared<MockAsyncConnection>();
EXPECT_CALL(*mock, options)
.WillRepeatedly(
Return(Options{}.set<TestOption<0>>("O0").set<TestOption<1>>("O1")));

EXPECT_CALL(*mock, ReadObject)
.WillOnce([&](AsyncConnection::ReadObjectParams const& p) {
EXPECT_THAT(p.options.get<TestOption<0>>(), "O0");
EXPECT_THAT(p.options.get<TestOption<1>>(), "O1-function");
EXPECT_THAT(p.options.get<TestOption<2>>(), "O2-function");
auto expected = google::storage::v2::ReadObjectRequest{};
EXPECT_TRUE(TextFormat::ParseFromString(kExpectedRequest, &expected));
EXPECT_THAT(p.request, IsProtoEqual(expected));
return make_ready_future(
make_status_or(std::unique_ptr<AsyncReaderConnection>(
MakeTestReaderConnection({"test-", "payload"}))));
});

auto client = AsyncClient(mock);
auto payload = client
.ReadAll(BucketName("test-bucket"), "test-object",
Options{}
.set<TestOption<1>>("O1-function")
.set<TestOption<2>>("O2-function"))
.get();
ASSERT_STATUS_OK(payload);
EXPECT_THAT(payload->contents(), ElementsAre("test-", "payload"));
}

TEST(AsyncClient, ReadAll2) {
auto constexpr kOriginalRequest = R"pb(
bucket: "test-only-invalid"
object: "test-object"
generation: 42
read_offset: 123
read_limit: 456
)pb";
auto constexpr kExpectedRequest =
R"pb(
bucket: "test-only-invalid" object: "test-object" generation: 42
)pb";
auto mock = std::make_shared<MockAsyncConnection>();
EXPECT_CALL(*mock, options)
.WillRepeatedly(
Return(Options{}.set<TestOption<0>>("O0").set<TestOption<1>>("O1")));

EXPECT_CALL(*mock, ReadObject)
.WillOnce([&](AsyncConnection::ReadObjectParams const& p) {
EXPECT_THAT(p.options.get<TestOption<0>>(), "O0");
EXPECT_THAT(p.options.get<TestOption<1>>(), "O1-function");
EXPECT_THAT(p.options.get<TestOption<2>>(), "O2-function");
auto expected = google::storage::v2::ReadObjectRequest{};
EXPECT_TRUE(TextFormat::ParseFromString(kExpectedRequest, &expected));
EXPECT_THAT(p.request, IsProtoEqual(expected));
return make_ready_future(
make_status_or(std::unique_ptr<AsyncReaderConnection>(
MakeTestReaderConnection({"payload"}))));
});

auto client = AsyncClient(mock);
auto request = google::storage::v2::ReadObjectRequest{};
EXPECT_TRUE(TextFormat::ParseFromString(kOriginalRequest, &request));
auto payload =
client
.ReadAll(std::move(request), Options{}
.set<TestOption<1>>("O1-function")
.set<TestOption<2>>("O2-function"))
.get();
ASSERT_STATUS_OK(payload);
EXPECT_THAT(payload->contents(), ElementsAre("payload"));
}

TEST(AsyncClient, ReadAllRequestFailure) {
auto constexpr kRequestText = R"pb(
bucket: "projects/_/buckets/test-bucket"
object: "test-object"
)pb";
auto mock = std::make_shared<MockAsyncConnection>();
EXPECT_CALL(*mock, options).WillRepeatedly(Return(Options{}));

EXPECT_CALL(*mock, ReadObject)
.WillOnce([&](AsyncConnection::ReadObjectParams const& p) {
auto expected = google::storage::v2::ReadObjectRequest{};
EXPECT_TRUE(TextFormat::ParseFromString(kRequestText, &expected));
EXPECT_THAT(p.request, IsProtoEqual(expected));
return make_ready_future(
StatusOr<std::unique_ptr<AsyncReaderConnection>>(
Status(StatusCode::kPermissionDenied, "uh-oh")));
});

auto client = AsyncClient(mock);
auto request = google::storage::v2::ReadObjectRequest{};
EXPECT_TRUE(TextFormat::ParseFromString(kRequestText, &request));
auto payload = client.ReadAll(std::move(request)).get();
EXPECT_THAT(payload, StatusIs(StatusCode::kPermissionDenied, "uh-oh"));
}

TEST(AsyncClient, ReadAllFromReader) {
auto reader_impl = MakeTestReaderConnection({"test-", "payload"});
auto* reader_impl_ptr = reader_impl.get();
auto reader = AsyncReader(std::move(reader_impl));
auto token = storage_internal::MakeAsyncToken(reader_impl_ptr);

auto payload =
AsyncClient::ReadAll(std::move(reader), std::move(token)).get();
ASSERT_STATUS_OK(payload);
EXPECT_THAT(payload->contents(), ElementsAre("test-", "payload"));
}

TEST(AsyncClient, StartAppendableObjectUpload1) {
auto constexpr kExpectedRequest = R"pb(
write_object_spec {
Expand Down
8 changes: 4 additions & 4 deletions google/cloud/storage/examples/storage_async_samples.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,10 @@ void ReadAll(google::cloud::storage_experimental::AsyncClient& client,
std::string object_name) -> google::cloud::future<std::uint64_t> {
// For small objects, consider `ReadAll()` which accumulates all the
// contents in memory using background threads.
auto payload = (co_await gcs_ex::ReadAll(client.ReadObject(
gcs_ex::BucketName(std::move(bucket_name)),
std::move(object_name))))
.value();
auto payload =
(co_await client.ReadAll(gcs_ex::BucketName(std::move(bucket_name)),
std::move(object_name)))
.value();
std::uint64_t count = 0;
for (auto const& buffer : payload.contents()) {
count += std::count(buffer.begin(), buffer.end(), '\n');
Expand Down
Loading