Skip to content

[cronet] Support aborting requests #1797

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkgs/cronet_http/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## 1.4.1-wip
## 1.5.0-wip

* Add the ability to abort requests.
* Upgrade Cronet dependencies version.

## 1.4.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,17 @@ class UrlRequestCallbackProxy(val callback: UrlRequestCallbackInterface) : UrlRe
)

fun onResponseStarted(request: UrlRequest?, info: UrlResponseInfo?)

fun onReadCompleted(
request: UrlRequest?,
info: UrlResponseInfo?,
byteBuffer: ByteBuffer?
)

fun onSucceeded(request: UrlRequest?, info: UrlResponseInfo?)

fun onCanceled(request: UrlRequest?, info: UrlResponseInfo?)

fun onFailed(
request: UrlRequest?,
info: UrlResponseInfo?,
Expand Down Expand Up @@ -78,6 +82,10 @@ class UrlRequestCallbackProxy(val callback: UrlRequestCallbackInterface) : UrlRe
callback.onSucceeded(request, info);
}

override fun onCanceled(request: UrlRequest?, info: UrlResponseInfo?) {
callback.onCanceled(request, info);
}

override fun onFailed(
request: UrlRequest?,
info: UrlResponseInfo?,
Expand Down
2 changes: 1 addition & 1 deletion pkgs/cronet_http/example/android/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
org.gradle.jvmargs=-Xmx1536M
org.gradle.jvmargs=-Xmx12800M
org.gradle.caching=true
android.useAndroidX=true
android.enableJetifier=true
126 changes: 121 additions & 5 deletions pkgs/cronet_http/example/integration_test/client_profile_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -221,29 +221,30 @@ void main() {
late List<int> receivedData;

setUpAll(() async {
final cancelCompleter = Completer<void>();
successServer = (await HttpServer.bind('localhost', 0))
..listen((request) async {
await request.drain<void>();
request.response.headers.set('Content-Type', 'text/plain');
while (true) {
while (!cancelCompleter.isCompleted) {
request.response.write('Hello World');
await request.response.flush();
await Future<void>.delayed(const Duration(seconds: 0));
// Let the event loop run.
await Future(() {});
}
await request.response.close();
});
final cancelCompleter = Completer<void>();
successServerUri = Uri.http('localhost:${successServer.port}');
final client = CronetClientWithProfile.defaultCronetEngine();
final request = StreamedRequest('GET', successServerUri);
unawaited(request.sink.close());
final response = await client.send(request);

var i = 0;
late final StreamSubscription<List<int>> s;
receivedData = [];
s = response.stream.listen((d) {
receivedData += d;
if (++i == 1000) {
if (++i == 2) {
s.cancel();
cancelCompleter.complete();
}
Expand All @@ -263,6 +264,121 @@ void main() {
});
});

group('abort before response', () {
late HttpServer successServer;
late Uri successServerUri;
late HttpClientRequestProfile profile;

setUpAll(() async {
final abortCompleter = Completer<void>();
successServer = (await HttpServer.bind('localhost', 0))
..listen((request) async {
await request.drain<void>();
request.response.headers.set('Content-Type', 'text/plain');
await request.response.close();
});
successServerUri = Uri.http('localhost:${successServer.port}');
final client = CronetClientWithProfile.defaultCronetEngine();
final request = AbortableStreamedRequest('GET', successServerUri,
abortTrigger: abortCompleter.future);
final responseFuture = client.send(request);
abortCompleter.complete();
unawaited(request.sink.close());
try {
await responseFuture;
} on RequestAbortedException {
// Expected failure.
}
profile = client.profile!;
});
tearDownAll(() {
successServer.close();
});

test('request attributes', () async {
expect(profile.requestData.contentLength, isNull);
expect(profile.requestData.startTime, isNotNull);
expect(profile.requestData.endTime, isNotNull);
expect(profile.requestData.error, contains('aborted'));
expect(profile.responseData.bodyBytes, isEmpty);
});

test('response attributes', () {
expect(profile.responseData.bodyBytes, isEmpty);
expect(profile.responseData.compressionState, isNull);
expect(profile.responseData.contentLength, isNull);
expect(profile.responseData.endTime, isNull);
expect(profile.responseData.error, isNull);
expect(profile.responseData.headers, isNull);
expect(profile.responseData.isRedirect, isNull);
expect(profile.responseData.persistentConnection, isNull);
expect(profile.responseData.reasonPhrase, isNull);
expect(profile.responseData.redirects, isEmpty);
expect(profile.responseData.startTime, isNull);
expect(profile.responseData.statusCode, isNull);
});
});

group('abort during response', () {
late HttpServer successServer;
late Uri successServerUri;
late HttpClientRequestProfile profile;
RequestAbortedException? streamException;

setUpAll(() async {
final abortCompleter = Completer<void>();
successServer = (await HttpServer.bind('localhost', 0))
..listen((request) async {
await request.drain<void>();
request.response.headers.set('Content-Type', 'text/plain');
while (!abortCompleter.isCompleted) {
request.response.write('Hello World');
await request.response.flush();
// Let the event loop run.
await Future(() {});
}
await request.response.close();
});
successServerUri = Uri.http('localhost:${successServer.port}');
final client = CronetClientWithProfile.defaultCronetEngine();
final request = AbortableStreamedRequest('GET', successServerUri,
abortTrigger: abortCompleter.future);
unawaited(request.sink.close());
final response = await client.send(request);
var i = 0;

try {
await response.stream.listen((d) {
if (++i == 2) {
abortCompleter.complete();
}
}).asFuture<void>();
} on RequestAbortedException catch (e) {
streamException = e;
}

profile = client.profile!;
});
tearDownAll(() {
successServer.close();
});

test('stream exception', () async {
expect(streamException, isA<RequestAbortedException>());
});

test('request attributes', () async {
expect(profile.requestData.contentLength, isNull);
expect(profile.requestData.startTime, isNotNull);
expect(profile.requestData.endTime, isNotNull);
});

test('response attributes', () {
expect(profile.responseData.error, contains('aborted'));
expect(profile.responseData.bodyBytes, isNotEmpty);
});
});

group('redirects', () {
late HttpServer successServer;
late Uri successServerUri;
Expand Down
5 changes: 5 additions & 0 deletions pkgs/cronet_http/example/integration_test/client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Future<void> testConformance() async {
canStreamRequestBody: false,
canReceiveSetCookieHeaders: true,
canSendCookieHeaders: true,
supportsAbort: true,
);
} finally {
HttpClientRequestProfile.profilingEnabled = profile;
Expand All @@ -36,6 +37,7 @@ Future<void> testConformance() async {
canStreamRequestBody: false,
canReceiveSetCookieHeaders: true,
canSendCookieHeaders: true,
supportsAbort: true,
);
} finally {
HttpClientRequestProfile.profilingEnabled = profile;
Expand All @@ -51,6 +53,9 @@ Future<void> testConformance() async {
return CronetClient.fromCronetEngine(engine);
},
canStreamRequestBody: false,
canReceiveSetCookieHeaders: true,
canSendCookieHeaders: true,
supportsAbort: true,
);
});
}
Expand Down
48 changes: 38 additions & 10 deletions pkgs/cronet_http/lib/src/cronet_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ jb.UrlRequestCallbackProxy$UrlRequestCallbackInterface _urlRequestCallbacks(
StreamController<List<int>>? responseStream;
JByteBuffer? jByteBuffer;
var numRedirects = 0;
var done = false;
var responseStreamCancelled = false;

// The order of callbacks generated by Cronet is documented here:
// https://developer.android.com/guide/topics/connectivity/cronet/lifecycle
Expand All @@ -208,8 +208,8 @@ jb.UrlRequestCallbackProxy$UrlRequestCallbackInterface _urlRequestCallbacks(
responseStream = StreamController(onCancel: () {
// The user did `response.stream.cancel()`. We can just pretend that
// the response completed normally.
if (done) return;
done = true;
if (responseStreamCancelled) return;
responseStreamCancelled = true;
urlRequest!.cancel();
responseStream!.sink.close();
jByteBuffer?.release();
Expand Down Expand Up @@ -260,7 +260,7 @@ jb.UrlRequestCallbackProxy$UrlRequestCallbackInterface _urlRequestCallbacks(
urlRequest?.read(jByteBuffer!);
},
onRedirectReceived: (urlRequest, responseInfo, newLocationUrl) {
if (done) return;
if (responseStreamCancelled) return;
final responseHeaders =
_cronetToClientHeaders(responseInfo!.getAllHeaders()!);

Expand Down Expand Up @@ -308,7 +308,7 @@ jb.UrlRequestCallbackProxy$UrlRequestCallbackInterface _urlRequestCallbacks(
}
},
onReadCompleted: (urlRequest, responseInfo, byteBuffer) {
if (done) return;
if (responseStreamCancelled) return;
byteBuffer!.flip();
final data = jByteBuffer!.asUint8List().sublist(0, byteBuffer.remaining);
responseStream!.add(data);
Expand All @@ -318,15 +318,15 @@ jb.UrlRequestCallbackProxy$UrlRequestCallbackInterface _urlRequestCallbacks(
urlRequest!.read(byteBuffer);
},
onSucceeded: (urlRequest, responseInfo) {
if (done) return;
done = true;
if (responseStreamCancelled) return;
responseStreamCancelled = true;
responseStream!.sink.close();
jByteBuffer?.release();
profile?.responseData.close();
},
onFailed: (urlRequest, responseInfo /* can be null */, cronetException) {
if (done) return;
done = true;
if (responseStreamCancelled) return;
responseStreamCancelled = true;
final error = ClientException(
'Cronet exception: ${cronetException.toString()}', request.url);
if (responseStream == null) {
Expand All @@ -345,6 +345,29 @@ jb.UrlRequestCallbackProxy$UrlRequestCallbackInterface _urlRequestCallbacks(
}
jByteBuffer?.release();
},
// Will always be the last callback invoked.
// See https://developer.android.com/develop/connectivity/cronet/reference/org/chromium/net/UrlRequest#cancel()
onCanceled: (urlRequest, urlResponseInfo /* can be null */) {
if (responseStreamCancelled) return;
responseStreamCancelled = true;
final error = RequestAbortedException(request.url);
if (responseStream == null) {
responseCompleter.completeError(error);
} else {
if (!responseStream!.isClosed) {
responseStream!.sink.addError(error);
responseStream!.close();
}
}
if (profile != null) {
if (profile.requestData.endTime == null) {
profile.requestData.closeWithError(error.toString());
} else {
profile.responseData.closeWithError(error.toString());
}
}
jByteBuffer?.release();
},
));
}

Expand Down Expand Up @@ -477,7 +500,12 @@ class CronetClient extends BaseClient {
builder.setUploadDataProvider(
jb.UploadDataProviders.create$2(data), _executor);
}
builder.build()!.start();

final cronetRequest = builder.build()!;
if (request case Abortable(:final abortTrigger?)) {
unawaited(abortTrigger.whenComplete(cronetRequest.cancel));
}
cronetRequest.start();
return responseCompleter.future;
}
}
Expand Down
Loading
Loading