-
Notifications
You must be signed in to change notification settings - Fork 4.6k
grpc: introduce ErrRetriesExhausted to wrap retry failures #8894
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ package status_test | |
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "reflect" | ||
| "strings" | ||
| "testing" | ||
|
|
@@ -259,3 +260,74 @@ func (s) TestStatus_ErrorDetailsMessageV1AndV2(t *testing.T) { | |
| } | ||
| } | ||
| } | ||
|
|
||
| func (s) TestFromError_Wrapped(t *testing.T) { | ||
| details := []protoadapt.MessageV1{ | ||
| &testpb.Empty{}, | ||
| } | ||
| s := status.New(codes.Canceled, "inner canceled") | ||
| sWithDetails, err := s.WithDetails(details...) | ||
| if err != nil { | ||
| t.Fatalf("WithDetails failed: %v", err) | ||
| } | ||
| innerErr := sWithDetails.Err() | ||
|
|
||
| testCases := []struct { | ||
| name string | ||
| err error | ||
| wantCode codes.Code | ||
| wantMessage string | ||
| wantDetails int | ||
| }{ | ||
| { | ||
| name: "direct_error", | ||
| err: innerErr, | ||
| wantCode: codes.Canceled, | ||
| wantMessage: "inner canceled", | ||
| wantDetails: 1, | ||
| }, | ||
| { | ||
| name: "wrapped_error", | ||
| err: fmt.Errorf("wrapped: %w", innerErr), | ||
| wantCode: codes.Canceled, | ||
| wantMessage: "wrapped: rpc error: code = Canceled desc = inner canceled", | ||
| wantDetails: 1, | ||
| }, | ||
| { | ||
| name: "double_wrapped_error", | ||
| err: fmt.Errorf("outer: %w", fmt.Errorf("inner: %w", innerErr)), | ||
easwars marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| wantCode: codes.Canceled, | ||
| wantMessage: "outer: inner: rpc error: code = Canceled desc = inner canceled", | ||
| wantDetails: 1, | ||
| }, | ||
| { | ||
| name: "double_wrapped_single_errorf", | ||
| err: fmt.Errorf("error: %w: %w", errors.New("test error"), innerErr), | ||
| wantCode: codes.Canceled, | ||
| wantMessage: "error: test error: rpc error: code = Canceled desc = inner canceled", | ||
| wantDetails: 1, | ||
| }, | ||
| } | ||
|
|
||
| for _, tc := range testCases { | ||
| t.Run(tc.name, func(t *testing.T) { | ||
| if got := status.Code(tc.err); got != tc.wantCode { | ||
| t.Errorf("status.Code(%v) = %v; want %v", tc.err, got, tc.wantCode) | ||
| } | ||
|
|
||
| st, ok := status.FromError(tc.err) | ||
| if !ok { | ||
| t.Fatalf("status.FromError(%v) returned false; want true", tc.err) | ||
| } | ||
| if got := st.Code(); got != tc.wantCode { | ||
| t.Errorf("st.Code() = %v; want %v", got, tc.wantCode) | ||
| } | ||
| if got := st.Message(); got != tc.wantMessage { | ||
| t.Errorf("st.Message() = %q; want %q", got, tc.wantMessage) | ||
| } | ||
| if got := len(st.Details()); got != tc.wantDetails { | ||
| t.Errorf("len(st.Details()) = %v; want %v", got, tc.wantDetails) | ||
| } | ||
|
Comment on lines
+322
to
+330
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we instead have a |
||
| }) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ package grpc | |
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "math" | ||
| rand "math/rand/v2" | ||
|
|
@@ -147,6 +148,15 @@ type ClientStream interface { | |
| RecvMsg(m any) error | ||
| } | ||
|
|
||
| // ErrRetriesExhausted is returned when an operation exceeds its configured | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: This is not any arbitrary operation. This only applies to RPCs, right. Can we make that more explicit here since this will be part of the API. |
||
| // maximum number of retry attempts. | ||
| // | ||
| // # Experimental | ||
| // | ||
| // Notice: This type is EXPERIMENTAL and may be changed or removed in a | ||
| // later release. | ||
| var ErrRetriesExhausted = errors.New("max retry attempts exhausted") | ||
|
|
||
| // NewStream creates a new Stream for the client side. This is typically | ||
| // called by generated code. ctx is used for the lifetime of the stream. | ||
| // | ||
|
|
@@ -749,7 +759,11 @@ func (a *csAttempt) shouldRetry(err error) (bool, error) { | |
| return false, err | ||
| } | ||
| if cs.numRetries+1 >= rp.MaxAttempts { | ||
| return false, err | ||
| return false, fmt.Errorf("stopped after %d attempts: %w: %w", | ||
| cs.numRetries+1, | ||
| ErrRetriesExhausted, | ||
| err, | ||
| ) | ||
|
Comment on lines
+762
to
+766
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using two Beyond the compilation error, there's a design challenge. The tests require that the resulting error To satisfy both requirements, a solution would be to use a custom error type that wraps the original For example: // unexported custom error type
type exhaustedError struct {
err error
attempts int
}
func (e *exhaustedError) Error() string {
return fmt.Sprintf("stopped after %d attempts: %v: %v", e.attempts, ErrRetriesExhausted, e.err)
}
func (e *exhaustedError) Unwrap() error {
return e.err
}
func (e *exhaustedError) Is(target error) bool {
return target == ErrRetriesExhausted
}Then you could construct the error as
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
While older versions of Go restricted the use of
As a result, the error |
||
| } | ||
|
|
||
| var dur time.Duration | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ package test | |
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "net" | ||
|
|
@@ -275,15 +276,15 @@ func (s) TestRetryStreaming(t *testing.T) { | |
| } | ||
| cErr := func(c codes.Code) clientOp { | ||
| return func(stream testgrpc.TestService_FullDuplexCallClient) error { | ||
| want := status.New(c, "this is a test error").Err() | ||
| if c == codes.OK { | ||
| want = io.EOF | ||
| } | ||
| res, err := stream.Recv() | ||
| if res != nil || | ||
| ((err == nil) != (want == nil)) || | ||
| (want != nil && err.Error() != want.Error()) { | ||
| return fmt.Errorf("client: Recv() = %v, %v; want <nil>, %v", res, err, want) | ||
| var gotCode codes.Code | ||
| if err == io.EOF { | ||
| gotCode = codes.OK | ||
| } else { | ||
| gotCode = status.Code(err) | ||
| } | ||
| if res != nil || gotCode != c { | ||
| return fmt.Errorf("client: Recv() = %v, %v; want <nil>, %v", res, err, c) | ||
| } | ||
| return nil | ||
| } | ||
|
|
@@ -550,7 +551,10 @@ func (s) TestMaxCallAttempts(t *testing.T) { | |
| t.Fatalf("client: Recv() = %s, %v; want <nil>, error", got, err) | ||
| } else if status.Code(err) != codes.Unavailable { | ||
| t.Fatalf("client: Recv() = _, %v; want _, Unavailable", err) | ||
| } else if !errors.Is(err, grpc.ErrRetriesExhausted) { | ||
| t.Fatalf("want: ErrRetriesExhausted, got: %v", err) | ||
| } | ||
|
|
||
| if streamCallCount != tc.expectedAttempts { | ||
| t.Fatalf("stream expectedAttempts = %v; want %v", streamCallCount, tc.expectedAttempts) | ||
| } | ||
|
|
@@ -816,3 +820,107 @@ func (s) TestRetryTransparentWhenCommitted(t *testing.T) { | |
| stream1.Recv() | ||
| stream1.Send(&testpb.StreamingOutputCallRequest{}) | ||
| } | ||
|
|
||
| func (s) TestRetryDisabled(t *testing.T) { | ||
| ss := &stubserver.StubServer{ | ||
| FullDuplexCallF: func(testgrpc.TestService_FullDuplexCallServer) error { | ||
| return status.New(codes.Unavailable, "retryable error").Err() | ||
| }, | ||
| EmptyCallF: func(context.Context, *testpb.Empty) (r *testpb.Empty, err error) { | ||
| return nil, status.New(codes.Unavailable, "retryable error").Err() | ||
| }, | ||
| } | ||
| if err := ss.Start([]grpc.ServerOption{}, | ||
| grpc.WithDefaultServiceConfig(`{ | ||
| "methodConfig": [{ | ||
| "name": [{"service": "grpc.testing.TestService"}], | ||
| "retryPolicy": { | ||
| "MaxAttempts": 4, | ||
| "InitialBackoff": ".01s", | ||
| "MaxBackoff": ".01s", | ||
| "BackoffMultiplier": 1.0, | ||
| "RetryableStatusCodes": [ "UNAVAILABLE" ] | ||
| } | ||
| }]}`), | ||
| grpc.WithDisableRetry()); err != nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this falls under go/go-style/decisions#indentation-confusion. Can we have the dial options initialized in a separate slice and have them be passed here, so that the |
||
| t.Fatalf("Error starting endpoint server: %v", err) | ||
| } | ||
| defer ss.Stop() | ||
|
|
||
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
| defer cancel() | ||
|
|
||
| // Test streaming RPC | ||
| stream, err := ss.Client.FullDuplexCall(ctx) | ||
| if err != nil { | ||
| t.Fatalf("Error while creating stream: %v", err) | ||
| } | ||
| _, err = stream.Recv() | ||
| if err == nil { | ||
| t.Fatalf("client: Recv() = <nil>, <nil>; want <nil>, error") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know we have many error strings like this. But this should remain a relic of the past. This error message is not very readable. Something like "stream.Recv() succeeded when expected to fail" would be more readable. Here and elsewhere where this applies. |
||
| } | ||
| if status.Code(err) != codes.Unavailable { | ||
| t.Fatalf("client: Recv() = _, %v; want _, Unavailable", err) | ||
| } | ||
| if errors.Is(err, grpc.ErrRetriesExhausted) { | ||
| t.Fatalf("client: Recv() error matches ErrRetriesExhausted, want not match") | ||
| } | ||
|
|
||
| // Test unary RPC | ||
| _, err = ss.Client.EmptyCall(ctx, &testpb.Empty{}) | ||
| if err == nil { | ||
| t.Fatalf("client: EmptyCall() = <nil>, <nil>; want <nil>, error") | ||
| } | ||
| if status.Code(err) != codes.Unavailable { | ||
| t.Fatalf("client: EmptyCall() = _, %v; want _, Unavailable", err) | ||
| } | ||
| if errors.Is(err, grpc.ErrRetriesExhausted) { | ||
| t.Fatalf("client: EmptyCall() error matches ErrRetriesExhausted, want not match") | ||
| } | ||
| } | ||
|
|
||
| func (s) TestRetryNotConfigured(t *testing.T) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test logic here and in the above test seems identical. The only difference is the dial options. Can we make it a table driven test instead? |
||
| ss := &stubserver.StubServer{ | ||
| FullDuplexCallF: func(testgrpc.TestService_FullDuplexCallServer) error { | ||
| return status.New(codes.Unavailable, "retryable error").Err() | ||
| }, | ||
| EmptyCallF: func(context.Context, *testpb.Empty) (r *testpb.Empty, err error) { | ||
| return nil, status.New(codes.Unavailable, "retryable error").Err() | ||
| }, | ||
| } | ||
| if err := ss.Start([]grpc.ServerOption{}); err != nil { | ||
| t.Fatalf("Error starting endpoint server: %v", err) | ||
| } | ||
| defer ss.Stop() | ||
|
|
||
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
| defer cancel() | ||
|
|
||
| // Test streaming RPC | ||
| stream, err := ss.Client.FullDuplexCall(ctx) | ||
| if err != nil { | ||
| t.Fatalf("Error while creating stream: %v", err) | ||
| } | ||
| _, err = stream.Recv() | ||
| if err == nil { | ||
| t.Fatalf("client: Recv() = <nil>, <nil>; want <nil>, error") | ||
| } | ||
| if status.Code(err) != codes.Unavailable { | ||
| t.Fatalf("client: Recv() = _, %v; want _, Unavailable", err) | ||
| } | ||
| if errors.Is(err, grpc.ErrRetriesExhausted) { | ||
| t.Fatalf("client: Recv() error matches ErrRetriesExhausted, want not match") | ||
| } | ||
|
|
||
| // Test unary RPC | ||
| _, err = ss.Client.EmptyCall(ctx, &testpb.Empty{}) | ||
| if err == nil { | ||
| t.Fatalf("client: EmptyCall() = <nil>, <nil>; want <nil>, error") | ||
| } | ||
| if status.Code(err) != codes.Unavailable { | ||
| t.Fatalf("client: EmptyCall() = _, %v; want _, Unavailable", err) | ||
| } | ||
| if errors.Is(err, grpc.ErrRetriesExhausted) { | ||
| t.Fatalf("client: EmptyCall() error matches ErrRetriesExhausted, want not match") | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: We could just inline the details proto here in the call to
s.WithDetailsand get rid of the slice and the unpacking here.