Skip to content

Commit afda310

Browse files
authored
[cronet] Support aborting requests (#1797)
1 parent 4a90d16 commit afda310

File tree

8 files changed

+844
-588
lines changed

8 files changed

+844
-588
lines changed

pkgs/cronet_http/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
## 1.4.1-wip
1+
## 1.5.0-wip
22

3+
* Add the ability to abort requests.
34
* Upgrade Cronet dependencies version.
45

56
## 1.4.0

pkgs/cronet_http/android/src/main/kotlin/io/flutter/plugins/cronet_http/UrlRequestCallbackProxy.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,17 @@ class UrlRequestCallbackProxy(val callback: UrlRequestCallbackInterface) : UrlRe
4040
)
4141

4242
fun onResponseStarted(request: UrlRequest?, info: UrlResponseInfo?)
43+
4344
fun onReadCompleted(
4445
request: UrlRequest?,
4546
info: UrlResponseInfo?,
4647
byteBuffer: ByteBuffer?
4748
)
4849

4950
fun onSucceeded(request: UrlRequest?, info: UrlResponseInfo?)
51+
52+
fun onCanceled(request: UrlRequest?, info: UrlResponseInfo?)
53+
5054
fun onFailed(
5155
request: UrlRequest?,
5256
info: UrlResponseInfo?,
@@ -78,6 +82,10 @@ class UrlRequestCallbackProxy(val callback: UrlRequestCallbackInterface) : UrlRe
7882
callback.onSucceeded(request, info);
7983
}
8084

85+
override fun onCanceled(request: UrlRequest?, info: UrlResponseInfo?) {
86+
callback.onCanceled(request, info);
87+
}
88+
8189
override fun onFailed(
8290
request: UrlRequest?,
8391
info: UrlResponseInfo?,
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
org.gradle.jvmargs=-Xmx1536M
1+
org.gradle.jvmargs=-Xmx12800M
22
org.gradle.caching=true
33
android.useAndroidX=true
44
android.enableJetifier=true

pkgs/cronet_http/example/integration_test/client_profile_test.dart

Lines changed: 121 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -221,29 +221,30 @@ void main() {
221221
late List<int> receivedData;
222222

223223
setUpAll(() async {
224+
final cancelCompleter = Completer<void>();
224225
successServer = (await HttpServer.bind('localhost', 0))
225226
..listen((request) async {
226227
await request.drain<void>();
227228
request.response.headers.set('Content-Type', 'text/plain');
228-
while (true) {
229+
while (!cancelCompleter.isCompleted) {
229230
request.response.write('Hello World');
230231
await request.response.flush();
231-
await Future<void>.delayed(const Duration(seconds: 0));
232+
// Let the event loop run.
233+
await Future(() {});
232234
}
235+
await request.response.close();
233236
});
234-
final cancelCompleter = Completer<void>();
235237
successServerUri = Uri.http('localhost:${successServer.port}');
236238
final client = CronetClientWithProfile.defaultCronetEngine();
237239
final request = StreamedRequest('GET', successServerUri);
238240
unawaited(request.sink.close());
239241
final response = await client.send(request);
240-
241242
var i = 0;
242243
late final StreamSubscription<List<int>> s;
243244
receivedData = [];
244245
s = response.stream.listen((d) {
245246
receivedData += d;
246-
if (++i == 1000) {
247+
if (++i == 2) {
247248
s.cancel();
248249
cancelCompleter.complete();
249250
}
@@ -263,6 +264,121 @@ void main() {
263264
});
264265
});
265266

267+
group('abort before response', () {
268+
late HttpServer successServer;
269+
late Uri successServerUri;
270+
late HttpClientRequestProfile profile;
271+
272+
setUpAll(() async {
273+
final abortCompleter = Completer<void>();
274+
successServer = (await HttpServer.bind('localhost', 0))
275+
..listen((request) async {
276+
await request.drain<void>();
277+
request.response.headers.set('Content-Type', 'text/plain');
278+
await request.response.close();
279+
});
280+
successServerUri = Uri.http('localhost:${successServer.port}');
281+
final client = CronetClientWithProfile.defaultCronetEngine();
282+
final request = AbortableStreamedRequest('GET', successServerUri,
283+
abortTrigger: abortCompleter.future);
284+
final responseFuture = client.send(request);
285+
abortCompleter.complete();
286+
unawaited(request.sink.close());
287+
try {
288+
await responseFuture;
289+
} on RequestAbortedException {
290+
// Expected failure.
291+
}
292+
profile = client.profile!;
293+
});
294+
tearDownAll(() {
295+
successServer.close();
296+
});
297+
298+
test('request attributes', () async {
299+
expect(profile.requestData.contentLength, isNull);
300+
expect(profile.requestData.startTime, isNotNull);
301+
expect(profile.requestData.endTime, isNotNull);
302+
expect(profile.requestData.error, contains('aborted'));
303+
expect(profile.responseData.bodyBytes, isEmpty);
304+
});
305+
306+
test('response attributes', () {
307+
expect(profile.responseData.bodyBytes, isEmpty);
308+
expect(profile.responseData.compressionState, isNull);
309+
expect(profile.responseData.contentLength, isNull);
310+
expect(profile.responseData.endTime, isNull);
311+
expect(profile.responseData.error, isNull);
312+
expect(profile.responseData.headers, isNull);
313+
expect(profile.responseData.isRedirect, isNull);
314+
expect(profile.responseData.persistentConnection, isNull);
315+
expect(profile.responseData.reasonPhrase, isNull);
316+
expect(profile.responseData.redirects, isEmpty);
317+
expect(profile.responseData.startTime, isNull);
318+
expect(profile.responseData.statusCode, isNull);
319+
});
320+
});
321+
322+
group('abort during response', () {
323+
late HttpServer successServer;
324+
late Uri successServerUri;
325+
late HttpClientRequestProfile profile;
326+
RequestAbortedException? streamException;
327+
328+
setUpAll(() async {
329+
final abortCompleter = Completer<void>();
330+
successServer = (await HttpServer.bind('localhost', 0))
331+
..listen((request) async {
332+
await request.drain<void>();
333+
request.response.headers.set('Content-Type', 'text/plain');
334+
while (!abortCompleter.isCompleted) {
335+
request.response.write('Hello World');
336+
await request.response.flush();
337+
// Let the event loop run.
338+
await Future(() {});
339+
}
340+
await request.response.close();
341+
});
342+
successServerUri = Uri.http('localhost:${successServer.port}');
343+
final client = CronetClientWithProfile.defaultCronetEngine();
344+
final request = AbortableStreamedRequest('GET', successServerUri,
345+
abortTrigger: abortCompleter.future);
346+
unawaited(request.sink.close());
347+
final response = await client.send(request);
348+
var i = 0;
349+
350+
try {
351+
await response.stream.listen((d) {
352+
if (++i == 2) {
353+
abortCompleter.complete();
354+
}
355+
}).asFuture<void>();
356+
} on RequestAbortedException catch (e) {
357+
streamException = e;
358+
}
359+
360+
profile = client.profile!;
361+
});
362+
tearDownAll(() {
363+
successServer.close();
364+
});
365+
366+
test('stream exception', () async {
367+
expect(streamException, isA<RequestAbortedException>());
368+
});
369+
370+
test('request attributes', () async {
371+
expect(profile.requestData.contentLength, isNull);
372+
expect(profile.requestData.startTime, isNotNull);
373+
expect(profile.requestData.endTime, isNotNull);
374+
});
375+
376+
test('response attributes', () {
377+
expect(profile.responseData.error, contains('aborted'));
378+
expect(profile.responseData.bodyBytes, isNotEmpty);
379+
});
380+
});
381+
266382
group('redirects', () {
267383
late HttpServer successServer;
268384
late Uri successServerUri;

pkgs/cronet_http/example/integration_test/client_test.dart

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ Future<void> testConformance() async {
2222
canStreamRequestBody: false,
2323
canReceiveSetCookieHeaders: true,
2424
canSendCookieHeaders: true,
25+
supportsAbort: true,
2526
);
2627
} finally {
2728
HttpClientRequestProfile.profilingEnabled = profile;
@@ -36,6 +37,7 @@ Future<void> testConformance() async {
3637
canStreamRequestBody: false,
3738
canReceiveSetCookieHeaders: true,
3839
canSendCookieHeaders: true,
40+
supportsAbort: true,
3941
);
4042
} finally {
4143
HttpClientRequestProfile.profilingEnabled = profile;
@@ -51,6 +53,9 @@ Future<void> testConformance() async {
5153
return CronetClient.fromCronetEngine(engine);
5254
},
5355
canStreamRequestBody: false,
56+
canReceiveSetCookieHeaders: true,
57+
canSendCookieHeaders: true,
58+
supportsAbort: true,
5459
);
5560
});
5661
}

pkgs/cronet_http/lib/src/cronet_client.dart

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ jb.UrlRequestCallbackProxy$UrlRequestCallbackInterface _urlRequestCallbacks(
195195
StreamController<List<int>>? responseStream;
196196
JByteBuffer? jByteBuffer;
197197
var numRedirects = 0;
198-
var done = false;
198+
var responseStreamCancelled = false;
199199

200200
// The order of callbacks generated by Cronet is documented here:
201201
// https://developer.android.com/guide/topics/connectivity/cronet/lifecycle
@@ -208,8 +208,8 @@ jb.UrlRequestCallbackProxy$UrlRequestCallbackInterface _urlRequestCallbacks(
208208
responseStream = StreamController(onCancel: () {
209209
// The user did `response.stream.cancel()`. We can just pretend that
210210
// the response completed normally.
211-
if (done) return;
212-
done = true;
211+
if (responseStreamCancelled) return;
212+
responseStreamCancelled = true;
213213
urlRequest!.cancel();
214214
responseStream!.sink.close();
215215
jByteBuffer?.release();
@@ -260,7 +260,7 @@ jb.UrlRequestCallbackProxy$UrlRequestCallbackInterface _urlRequestCallbacks(
260260
urlRequest?.read(jByteBuffer!);
261261
},
262262
onRedirectReceived: (urlRequest, responseInfo, newLocationUrl) {
263-
if (done) return;
263+
if (responseStreamCancelled) return;
264264
final responseHeaders =
265265
_cronetToClientHeaders(responseInfo!.getAllHeaders()!);
266266

@@ -308,7 +308,7 @@ jb.UrlRequestCallbackProxy$UrlRequestCallbackInterface _urlRequestCallbacks(
308308
}
309309
},
310310
onReadCompleted: (urlRequest, responseInfo, byteBuffer) {
311-
if (done) return;
311+
if (responseStreamCancelled) return;
312312
byteBuffer!.flip();
313313
final data = jByteBuffer!.asUint8List().sublist(0, byteBuffer.remaining);
314314
responseStream!.add(data);
@@ -318,15 +318,15 @@ jb.UrlRequestCallbackProxy$UrlRequestCallbackInterface _urlRequestCallbacks(
318318
urlRequest!.read(byteBuffer);
319319
},
320320
onSucceeded: (urlRequest, responseInfo) {
321-
if (done) return;
322-
done = true;
321+
if (responseStreamCancelled) return;
322+
responseStreamCancelled = true;
323323
responseStream!.sink.close();
324324
jByteBuffer?.release();
325325
profile?.responseData.close();
326326
},
327327
onFailed: (urlRequest, responseInfo /* can be null */, cronetException) {
328-
if (done) return;
329-
done = true;
328+
if (responseStreamCancelled) return;
329+
responseStreamCancelled = true;
330330
final error = ClientException(
331331
'Cronet exception: ${cronetException.toString()}', request.url);
332332
if (responseStream == null) {
@@ -345,6 +345,29 @@ jb.UrlRequestCallbackProxy$UrlRequestCallbackInterface _urlRequestCallbacks(
345345
}
346346
jByteBuffer?.release();
347347
},
348+
// Will always be the last callback invoked.
349+
// See https://developer.android.com/develop/connectivity/cronet/reference/org/chromium/net/UrlRequest#cancel()
350+
onCanceled: (urlRequest, urlResponseInfo /* can be null */) {
351+
if (responseStreamCancelled) return;
352+
responseStreamCancelled = true;
353+
final error = RequestAbortedException(request.url);
354+
if (responseStream == null) {
355+
responseCompleter.completeError(error);
356+
} else {
357+
if (!responseStream!.isClosed) {
358+
responseStream!.sink.addError(error);
359+
responseStream!.close();
360+
}
361+
}
362+
if (profile != null) {
363+
if (profile.requestData.endTime == null) {
364+
profile.requestData.closeWithError(error.toString());
365+
} else {
366+
profile.responseData.closeWithError(error.toString());
367+
}
368+
}
369+
jByteBuffer?.release();
370+
},
348371
));
349372
}
350373

@@ -477,7 +500,12 @@ class CronetClient extends BaseClient {
477500
builder.setUploadDataProvider(
478501
jb.UploadDataProviders.create$2(data), _executor);
479502
}
480-
builder.build()!.start();
503+
504+
final cronetRequest = builder.build()!;
505+
if (request case Abortable(:final abortTrigger?)) {
506+
unawaited(abortTrigger.whenComplete(cronetRequest.cancel));
507+
}
508+
cronetRequest.start();
481509
return responseCompleter.future;
482510
}
483511
}

0 commit comments

Comments
 (0)