Skip to content

Conversation

JTaky
Copy link
Contributor

@JTaky JTaky commented Oct 10, 2025

What is the purpose of the change

Ensures that the operations become idempotent or atomic, allowing the GCS client to safely retry the 503 errors. Thus client can actually perform the retry in such conditions.

Original retry logic was added in #24753.

Thanks Xi Zhang for pointing to the problem and providing the solution in https://issues.apache.org/jira/browse/FLINK-38225.

Brief change log

  • Adds an option when composing the GCS request, to ensure that operation is atomic
  • Adds an option when writing to GCS, to ensure that write operation is atomic

Verifying this change

This change is already covered by existing tests, such as GSRecoverableWriterTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@JTaky JTaky marked this pull request as ready for review October 10, 2025 11:41
@flinkbot
Copy link
Collaborator

flinkbot commented Oct 10, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build


BlobInfo blobInfo = BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
com.google.cloud.WriteChannel writeChannel =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to junit this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can wrap the storage in the UT and ensure that proper option is passed.

For this we need to extend the Storage. Which is for InternalExtensionOnly.

Some e2e would be great to have, but from what I understand only Azure is available in CI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried briefly to make some UT and without mocking it is hard. Currently GSBlobStorageImpl is too low level and it is the component which is being mocked in the codebase.

Creating a TestStorage to catch GCP calls is also hard, notable because some of the methods returns Blob object, which cannot be created without reflections.

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Oct 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants