@@ -28,7 +28,6 @@ goog.module('grpc.web.GrpcWebClientBase');
28
28
goog . module . declareLegacyNamespace ( ) ;
29
29
30
30
31
- const AbstractClientBase = goog . require ( 'grpc.web.AbstractClientBase' ) ;
32
31
const ClientOptions = goog . requireType ( 'grpc.web.ClientOptions' ) ;
33
32
const ClientReadableStream = goog . require ( 'grpc.web.ClientReadableStream' ) ;
34
33
const ClientUnaryCallImpl = goog . require ( 'grpc.web.ClientUnaryCallImpl' ) ;
@@ -40,6 +39,7 @@ const RpcError = goog.require('grpc.web.RpcError');
40
39
const StatusCode = goog . require ( 'grpc.web.StatusCode' ) ;
41
40
const XhrIo = goog . require ( 'goog.net.XhrIo' ) ;
42
41
const googCrypt = goog . require ( 'goog.crypt.base64' ) ;
42
+ const { AbstractClientBase, PromiseCallOptions, getHostname} = goog . require ( 'grpc.web.AbstractClientBase' ) ;
43
43
const { Status} = goog . require ( 'grpc.web.Status' ) ;
44
44
const { StreamInterceptor, UnaryInterceptor} = goog . require ( 'grpc.web.Interceptor' ) ;
45
45
const { toObject} = goog . require ( 'goog.collections.maps' ) ;
@@ -101,7 +101,7 @@ class GrpcWebClientBase {
101
101
* @export
102
102
*/
103
103
rpcCall ( method , requestMessage , metadata , methodDescriptor , callback ) {
104
- const hostname = AbstractClientBase . getHostname ( method , methodDescriptor ) ;
104
+ const hostname = getHostname ( method , methodDescriptor ) ;
105
105
const invoker = GrpcWebClientBase . runInterceptors_ (
106
106
( request ) => this . startStream_ ( request , hostname ) ,
107
107
this . streamInterceptors_ ) ;
@@ -112,18 +112,35 @@ class GrpcWebClientBase {
112
112
}
113
113
114
114
/**
115
- * @override
116
- * @export
115
+ * @param {string } method The method to invoke
116
+ * @param {REQUEST } requestMessage The request proto
117
+ * @param {!Object<string, string> } metadata User defined call metadata
118
+ * @param {!MethodDescriptor<REQUEST, RESPONSE> } methodDescriptor
119
+ * @param {?PromiseCallOptions= } options Options for the call
120
+ * @return {!Promise<RESPONSE> }
121
+ * @template REQUEST, RESPONSE
117
122
*/
118
- thenableCall ( method , requestMessage , metadata , methodDescriptor ) {
119
- const hostname = AbstractClientBase . getHostname ( method , methodDescriptor ) ;
123
+ thenableCall (
124
+ method , requestMessage , metadata , methodDescriptor , options = { } ) {
125
+ const hostname = getHostname ( method , methodDescriptor ) ;
126
+ const signal = options && options . signal ;
120
127
const initialInvoker = ( request ) => new Promise ( ( resolve , reject ) => {
128
+ // If the signal is already aborted, immediately reject the promise
129
+ // and don't issue the call.
130
+ if ( signal && signal . aborted ) {
131
+ const error = new RpcError ( StatusCode . CANCELLED , 'Aborted' ) ;
132
+ error . cause = signal . reason ;
133
+ reject ( error ) ;
134
+ return ;
135
+ }
136
+
121
137
const stream = this . startStream_ ( request , hostname ) ;
122
138
let unaryMetadata ;
123
139
let unaryStatus ;
124
140
let unaryMsg ;
125
141
GrpcWebClientBase . setCallback_ (
126
- stream , ( error , response , status , metadata , unaryResponseReceived ) => {
142
+ stream ,
143
+ ( error , response , status , metadata , unaryResponseReceived ) => {
127
144
if ( error ) {
128
145
reject ( error ) ;
129
146
} else if ( unaryResponseReceived ) {
@@ -136,7 +153,19 @@ class GrpcWebClientBase {
136
153
resolve ( request . getMethodDescriptor ( ) . createUnaryResponse (
137
154
unaryMsg , unaryMetadata , unaryStatus ) ) ;
138
155
}
139
- } , true ) ;
156
+ } ,
157
+ true ) ;
158
+
159
+ // Wire up cancellation from the abort signal, if any.
160
+ if ( signal ) {
161
+ signal . addEventListener ( 'abort' , ( ) => {
162
+ stream . cancel ( ) ;
163
+
164
+ const error = new RpcError ( StatusCode . CANCELLED , 'Aborted' ) ;
165
+ error . cause = /** @type {!AbortSignal } */ ( signal ) . reason ;
166
+ reject ( error ) ;
167
+ } ) ;
168
+ }
140
169
} ) ;
141
170
const invoker = GrpcWebClientBase . runInterceptors_ (
142
171
initialInvoker , this . unaryInterceptors_ ) ;
@@ -152,20 +181,21 @@ class GrpcWebClientBase {
152
181
* @param {!Object<string, string> } metadata User defined call metadata
153
182
* @param {!MethodDescriptor<REQUEST, RESPONSE> } methodDescriptor Information
154
183
* of this RPC method
184
+ * @param {?PromiseCallOptions= } options Options for the call
155
185
* @return {!Promise<RESPONSE> }
156
186
* @template REQUEST, RESPONSE
157
187
*/
158
- unaryCall ( method , requestMessage , metadata , methodDescriptor ) {
159
- return /** @type {!Promise<RESPONSE> }*/ (
160
- this . thenableCall ( method , requestMessage , metadata , methodDescriptor ) ) ;
188
+ unaryCall ( method , requestMessage , metadata , methodDescriptor , options = { } ) {
189
+ return /** @type {!Promise<RESPONSE> }*/ ( this . thenableCall (
190
+ method , requestMessage , metadata , methodDescriptor , options ) ) ;
161
191
}
162
192
163
193
/**
164
194
* @override
165
195
* @export
166
196
*/
167
197
serverStreaming ( method , requestMessage , metadata , methodDescriptor ) {
168
- const hostname = AbstractClientBase . getHostname ( method , methodDescriptor ) ;
198
+ const hostname = getHostname ( method , methodDescriptor ) ;
169
199
const invoker = GrpcWebClientBase . runInterceptors_ (
170
200
( request ) => this . startStream_ ( request , hostname ) ,
171
201
this . streamInterceptors_ ) ;
@@ -279,7 +309,9 @@ class GrpcWebClientBase {
279
309
message : 'Incomplete response' ,
280
310
} ) ;
281
311
} else if ( useUnaryResponse ) {
282
- callback ( null , responseReceived , null , null , /* unaryResponseReceived= */ true ) ;
312
+ callback (
313
+ null , responseReceived , null , null ,
314
+ /* unaryResponseReceived= */ true ) ;
283
315
} else {
284
316
callback ( null , responseReceived ) ;
285
317
}
@@ -368,12 +400,9 @@ class GrpcWebClientBase {
368
400
* (!Promise<RESPONSE>|!ClientReadableStream<RESPONSE>)}
369
401
*/
370
402
static runInterceptors_ ( invoker , interceptors ) {
371
- let curInvoker = invoker ;
372
- interceptors . forEach ( ( interceptor ) => {
373
- const lastInvoker = curInvoker ;
374
- curInvoker = ( request ) => interceptor . intercept ( request , lastInvoker ) ;
375
- } ) ;
376
- return curInvoker ;
403
+ return interceptors . reduce ( ( accumulatedInvoker , interceptor ) => {
404
+ return ( request ) => interceptor . intercept ( request , accumulatedInvoker ) ;
405
+ } , invoker ) ;
377
406
}
378
407
}
379
408
0 commit comments