Skip to content

Commit 587484f

Browse files
committed
Add abort API support for promise calls
For grpc#1478
1 parent b5ff5d3 commit 587484f

File tree

3 files changed

+76
-35
lines changed

3 files changed

+76
-35
lines changed

docker-compose.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
version: '3'
21
services:
32
prereqs:
43
build:

javascript/net/grpc/web/abstractclientbase.js

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,20 @@ const MethodDescriptor = goog.require('grpc.web.MethodDescriptor');
3232
const RpcError = goog.require('grpc.web.RpcError');
3333

3434

35+
/**
36+
* @constructor
37+
* @struct
38+
* @final
39+
*/
40+
const PromiseCallOptions = function() {};
41+
42+
/**
43+
* An AbortSignal to abort the call.
44+
* @type {AbortSignal|undefined}
45+
*/
46+
PromiseCallOptions.prototype.signal;
47+
48+
3549
/**
3650
* This interface represents a grpc-web client
3751
* @interface
@@ -62,10 +76,11 @@ const AbstractClientBase = class {
6276
* @param {!Object<string, string>} metadata User defined call metadata
6377
* @param {!MethodDescriptor<REQUEST, RESPONSE>}
6478
* methodDescriptor Information of this RPC method
79+
* @param options Options for the call
6580
* @return {!IThenable<RESPONSE>}
6681
* A promise that resolves to the response message
6782
*/
68-
thenableCall(method, requestMessage, metadata, methodDescriptor) {}
83+
thenableCall(method, requestMessage, metadata, methodDescriptor, options) {}
6984

7085
/**
7186
* @abstract
@@ -78,21 +93,19 @@ const AbstractClientBase = class {
7893
* @return {!ClientReadableStream<RESPONSE>} The Client Readable Stream
7994
*/
8095
serverStreaming(method, requestMessage, metadata, methodDescriptor) {}
81-
82-
/**
83-
* Get the hostname of the current request.
84-
* @static
85-
* @template REQUEST, RESPONSE
86-
* @param {string} method
87-
* @param {!MethodDescriptor<REQUEST,RESPONSE>} methodDescriptor
88-
* @return {string}
89-
*/
90-
static getHostname(method, methodDescriptor) {
91-
// method = hostname + methodDescriptor.name(relative path of this method)
92-
return method.substr(0, method.length - methodDescriptor.name.length);
93-
}
9496
};
9597

98+
/**
99+
* Get the hostname of the current request.
100+
* @template REQUEST, RESPONSE
101+
* @param {string} method
102+
* @param {!MethodDescriptor<REQUEST,RESPONSE>} methodDescriptor
103+
* @return {string}
104+
*/
105+
function getHostname(method, methodDescriptor) {
106+
// method = hostname + methodDescriptor.name(relative path of this method)
107+
return method.substr(0, method.length - methodDescriptor.name.length);
108+
}
96109

97110

98-
exports = AbstractClientBase;
111+
exports = {AbstractClientBase, PromiseCallOptions, getHostname};

javascript/net/grpc/web/grpcwebclientbase.js

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ goog.module('grpc.web.GrpcWebClientBase');
2828
goog.module.declareLegacyNamespace();
2929

3030

31-
const AbstractClientBase = goog.require('grpc.web.AbstractClientBase');
3231
const ClientOptions = goog.requireType('grpc.web.ClientOptions');
3332
const ClientReadableStream = goog.require('grpc.web.ClientReadableStream');
3433
const ClientUnaryCallImpl = goog.require('grpc.web.ClientUnaryCallImpl');
@@ -40,6 +39,7 @@ const RpcError = goog.require('grpc.web.RpcError');
4039
const StatusCode = goog.require('grpc.web.StatusCode');
4140
const XhrIo = goog.require('goog.net.XhrIo');
4241
const googCrypt = goog.require('goog.crypt.base64');
42+
const {AbstractClientBase, PromiseCallOptions, getHostname} = goog.require('grpc.web.AbstractClientBase');
4343
const {Status} = goog.require('grpc.web.Status');
4444
const {StreamInterceptor, UnaryInterceptor} = goog.require('grpc.web.Interceptor');
4545
const {toObject} = goog.require('goog.collections.maps');
@@ -101,7 +101,7 @@ class GrpcWebClientBase {
101101
* @export
102102
*/
103103
rpcCall(method, requestMessage, metadata, methodDescriptor, callback) {
104-
const hostname = AbstractClientBase.getHostname(method, methodDescriptor);
104+
const hostname = getHostname(method, methodDescriptor);
105105
const invoker = GrpcWebClientBase.runInterceptors_(
106106
(request) => this.startStream_(request, hostname),
107107
this.streamInterceptors_);
@@ -112,18 +112,35 @@ class GrpcWebClientBase {
112112
}
113113

114114
/**
115-
* @override
116-
* @export
115+
* @param {string} method The method to invoke
116+
* @param {REQUEST} requestMessage The request proto
117+
* @param {!Object<string, string>} metadata User defined call metadata
118+
* @param {!MethodDescriptor<REQUEST, RESPONSE>} methodDescriptor
119+
* @param {?PromiseCallOptions=} options Options for the call
120+
* @return {!Promise<RESPONSE>}
121+
* @template REQUEST, RESPONSE
117122
*/
118-
thenableCall(method, requestMessage, metadata, methodDescriptor) {
119-
const hostname = AbstractClientBase.getHostname(method, methodDescriptor);
123+
thenableCall(
124+
method, requestMessage, metadata, methodDescriptor, options = {}) {
125+
const hostname = getHostname(method, methodDescriptor);
126+
const signal = options && options.signal;
120127
const initialInvoker = (request) => new Promise((resolve, reject) => {
128+
// If the signal is already aborted, immediately reject the promise
129+
// and don't issue the call.
130+
if (signal && signal.aborted) {
131+
const error = new RpcError(StatusCode.CANCELLED, 'Aborted');
132+
error.cause = signal.reason;
133+
reject(error);
134+
return;
135+
}
136+
121137
const stream = this.startStream_(request, hostname);
122138
let unaryMetadata;
123139
let unaryStatus;
124140
let unaryMsg;
125141
GrpcWebClientBase.setCallback_(
126-
stream, (error, response, status, metadata, unaryResponseReceived) => {
142+
stream,
143+
(error, response, status, metadata, unaryResponseReceived) => {
127144
if (error) {
128145
reject(error);
129146
} else if (unaryResponseReceived) {
@@ -136,7 +153,19 @@ class GrpcWebClientBase {
136153
resolve(request.getMethodDescriptor().createUnaryResponse(
137154
unaryMsg, unaryMetadata, unaryStatus));
138155
}
139-
}, true);
156+
},
157+
true);
158+
159+
// Wire up cancellation from the abort signal, if any.
160+
if (signal) {
161+
signal.addEventListener('abort', () => {
162+
stream.cancel();
163+
164+
const error = new RpcError(StatusCode.CANCELLED, 'Aborted');
165+
error.cause = /** @type {!AbortSignal} */ (signal).reason;
166+
reject(error);
167+
});
168+
}
140169
});
141170
const invoker = GrpcWebClientBase.runInterceptors_(
142171
initialInvoker, this.unaryInterceptors_);
@@ -152,20 +181,21 @@ class GrpcWebClientBase {
152181
* @param {!Object<string, string>} metadata User defined call metadata
153182
* @param {!MethodDescriptor<REQUEST, RESPONSE>} methodDescriptor Information
154183
* of this RPC method
184+
* @param {?PromiseCallOptions=} options Options for the call
155185
* @return {!Promise<RESPONSE>}
156186
* @template REQUEST, RESPONSE
157187
*/
158-
unaryCall(method, requestMessage, metadata, methodDescriptor) {
159-
return /** @type {!Promise<RESPONSE>}*/ (
160-
this.thenableCall(method, requestMessage, metadata, methodDescriptor));
188+
unaryCall(method, requestMessage, metadata, methodDescriptor, options = {}) {
189+
return /** @type {!Promise<RESPONSE>}*/ (this.thenableCall(
190+
method, requestMessage, metadata, methodDescriptor, options));
161191
}
162192

163193
/**
164194
* @override
165195
* @export
166196
*/
167197
serverStreaming(method, requestMessage, metadata, methodDescriptor) {
168-
const hostname = AbstractClientBase.getHostname(method, methodDescriptor);
198+
const hostname = getHostname(method, methodDescriptor);
169199
const invoker = GrpcWebClientBase.runInterceptors_(
170200
(request) => this.startStream_(request, hostname),
171201
this.streamInterceptors_);
@@ -279,7 +309,9 @@ class GrpcWebClientBase {
279309
message: 'Incomplete response',
280310
});
281311
} else if (useUnaryResponse) {
282-
callback(null, responseReceived, null, null, /* unaryResponseReceived= */ true);
312+
callback(
313+
null, responseReceived, null, null,
314+
/* unaryResponseReceived= */ true);
283315
} else {
284316
callback(null, responseReceived);
285317
}
@@ -368,12 +400,9 @@ class GrpcWebClientBase {
368400
* (!Promise<RESPONSE>|!ClientReadableStream<RESPONSE>)}
369401
*/
370402
static runInterceptors_(invoker, interceptors) {
371-
let curInvoker = invoker;
372-
interceptors.forEach((interceptor) => {
373-
const lastInvoker = curInvoker;
374-
curInvoker = (request) => interceptor.intercept(request, lastInvoker);
375-
});
376-
return curInvoker;
403+
return interceptors.reduce((accumulatedInvoker, interceptor) => {
404+
return (request) => interceptor.intercept(request, accumulatedInvoker);
405+
}, invoker);
377406
}
378407
}
379408

0 commit comments

Comments
 (0)