diff --git a/pkgs/http_client_conformance_tests/lib/http_client_conformance_tests.dart b/pkgs/http_client_conformance_tests/lib/http_client_conformance_tests.dart index 0686c9d09c..12884f25e7 100644 --- a/pkgs/http_client_conformance_tests/lib/http_client_conformance_tests.dart +++ b/pkgs/http_client_conformance_tests/lib/http_client_conformance_tests.dart @@ -4,6 +4,7 @@ import 'package:http/http.dart'; +import 'src/abort_tests.dart'; import 'src/close_tests.dart'; import 'src/compressed_response_body_tests.dart'; import 'src/isolate_test.dart'; @@ -22,6 +23,7 @@ import 'src/response_headers_tests.dart'; import 'src/response_status_line_tests.dart'; import 'src/server_errors_test.dart'; +export 'src/abort_tests.dart' show testAbort; export 'src/close_tests.dart' show testClose; export 'src/compressed_response_body_tests.dart' show testCompressedResponseBody; @@ -49,7 +51,7 @@ export 'src/server_errors_test.dart' show testServerErrors; // /// If [canStreamResponseBody] is `false` then tests that assume that the /// [Client] supports receiving HTTP responses with unbounded body sizes will -/// be skipped +/// be skipped. /// /// If [redirectAlwaysAllowed] is `true` then tests that require the [Client] /// to limit redirects will be skipped. @@ -75,6 +77,8 @@ export 'src/server_errors_test.dart' show testServerErrors; /// If [supportsMultipartRequest] is `false` then tests that assume that /// multipart requests can be sent will be skipped. /// +/// If [supportsAbort] is `false` then tests that assume that requests can be +/// aborted will be skipped. /// The tests are run against a series of HTTP servers that are started by the /// tests. If the tests are run in the browser, then the test servers are /// started in another process. Otherwise, the test servers are run in-process. @@ -90,6 +94,8 @@ void testAll( bool canSendCookieHeaders = false, bool canReceiveSetCookieHeaders = false, bool supportsMultipartRequest = true, + // TODO: make this false, for now true to see what breaks. + bool supportsAbort = true, }) { testRequestBody(clientFactory()); testRequestBodyStreamed(clientFactory(), @@ -116,4 +122,8 @@ void testAll( canSendCookieHeaders: canSendCookieHeaders); testResponseCookies(clientFactory(), canReceiveSetCookieHeaders: canReceiveSetCookieHeaders); + testAbort(clientFactory(), + supportsAbort: supportsAbort, + canStreamRequestBody: canStreamRequestBody, + canStreamResponseBody: canStreamResponseBody); } diff --git a/pkgs/http_client_conformance_tests/lib/src/abort_server.dart b/pkgs/http_client_conformance_tests/lib/src/abort_server.dart new file mode 100644 index 0000000000..c66454e18a --- /dev/null +++ b/pkgs/http_client_conformance_tests/lib/src/abort_server.dart @@ -0,0 +1,43 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:io'; + +import 'package:async/async.dart'; +import 'package:stream_channel/stream_channel.dart'; + +/// Starts an HTTP server that sends a stream of integers. +/// +/// Channel protocol: +/// On Startup: +/// - send port +/// When Receive Anything: +/// - close current request +/// - exit server +void hybridMain(StreamChannel channel) async { + final channelQueue = StreamQueue(channel.stream); + + late HttpServer server; + server = (await HttpServer.bind('localhost', 0)) + ..listen((request) async { + // TODO: might have to ignore exceptions in the server because it will + // probably be disconnected + + await request.drain(); + request.response.headers.set('Access-Control-Allow-Origin', '*'); + request.response.headers.set('Content-Type', 'text/plain'); + + for (var i = 0; i < 10000; ++i) { + request.response.write('$i\n'); + await request.response.flush(); + // Let the event loop run. + await Future.delayed(const Duration()); + } + await request.response.close(); + }); + + channel.sink.add(server.port); + unawaited(channelQueue.next.then((value) => unawaited(server.close()))); +} diff --git a/pkgs/http_client_conformance_tests/lib/src/abort_server_vm.dart b/pkgs/http_client_conformance_tests/lib/src/abort_server_vm.dart new file mode 100644 index 0000000000..eddcd8a671 --- /dev/null +++ b/pkgs/http_client_conformance_tests/lib/src/abort_server_vm.dart @@ -0,0 +1,14 @@ +// Generated by generate_server_wrappers.dart. Do not edit. + +import 'package:stream_channel/stream_channel.dart'; + +import 'abort_server.dart'; + +export 'server_queue_helpers.dart' show StreamQueueOfNullableObjectExtension; + +/// Starts the redirect test HTTP server in the same process. +Future> startServer() async { + final controller = StreamChannelController(sync: true); + hybridMain(controller.foreign); + return controller.local; +} diff --git a/pkgs/http_client_conformance_tests/lib/src/abort_server_web.dart b/pkgs/http_client_conformance_tests/lib/src/abort_server_web.dart new file mode 100644 index 0000000000..0b3520def8 --- /dev/null +++ b/pkgs/http_client_conformance_tests/lib/src/abort_server_web.dart @@ -0,0 +1,14 @@ +// Generated by generate_server_wrappers.dart. Do not edit. + +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; + +export 'server_queue_helpers.dart' show StreamQueueOfNullableObjectExtension; + +/// Starts the redirect test HTTP server out-of-process. +Future> startServer() async => spawnHybridUri( + Uri( + scheme: 'package', + path: 'http_client_conformance_tests/src/abort_server.dart', + ), +); diff --git a/pkgs/http_client_conformance_tests/lib/src/abort_tests.dart b/pkgs/http_client_conformance_tests/lib/src/abort_tests.dart new file mode 100644 index 0000000000..be6d828b66 --- /dev/null +++ b/pkgs/http_client_conformance_tests/lib/src/abort_tests.dart @@ -0,0 +1,124 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'package:async/async.dart'; +import 'package:http/http.dart'; +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; + +import 'abort_server_vm.dart' + if (dart.library.js_interop) 'abort_server_web.dart'; + +/// Tests that the client supports aborting requests. +/// +/// If [supportsAbort] is `false` then tests that assume that requests can be +/// aborted will be skipped. +/// +/// If [canStreamResponseBody] is `false` then tests that assume that the +/// [Client] supports receiving HTTP responses with unbounded body sizes will +/// be skipped. +/// +/// If [canStreamRequestBody] is `false` then tests that assume that the +/// [Client] supports sending HTTP requests with unbounded body sizes will be +/// skipped. +void testAbort( + Client client, { + bool supportsAbort = true, + bool canStreamRequestBody = true, + bool canStreamResponseBody = true, +}) { + group('abort', () { + late String host; + late StreamChannel httpServerChannel; + late StreamQueue httpServerQueue; + late Uri serverUrl; + + setUp(() async { + httpServerChannel = await startServer(); + httpServerQueue = StreamQueue(httpServerChannel.stream); + host = 'localhost:${await httpServerQueue.nextAsInt}'; + serverUrl = Uri.http(host, ''); + }); + tearDownAll(() => httpServerChannel.sink.add(null)); + + test('before request', () async { + final request = Request('GET', serverUrl); + + // TODO: Trigger abort + + expect( + client.send(request), + throwsA( + isA().having((e) => e.uri, 'uri', serverUrl))); + }); + + test('during request stream', () async { + final request = StreamedRequest('POST', serverUrl); + + final response = client.send(request); + request.sink.add('Hello World'.codeUnits); + // TODO: Trigger abort + + expect( + response, + throwsA( + isA().having((e) => e.uri, 'uri', serverUrl))); + await request + .sink.done; // Verify that the stream subscription was cancelled. + }, skip: canStreamRequestBody ? false : 'does not stream request bodies'); + + test('after response', () async { + final request = Request('GET', serverUrl); + + final response = await client.send(request); + + // TODO: Trigger abort + + expect( + response.stream.single, + throwsA( + isA().having((e) => e.uri, 'uri', serverUrl))); + }); + + test('while streaming response', () async { + final request = Request('GET', serverUrl); + + final response = await client.send(request); + + var i = 0; + expect( + response.stream.listen((data) { + ++i; + if (i == 1000) { + // TODO: Trigger abort + } + }).asFuture(), + throwsA( + isA().having((e) => e.uri, 'uri', serverUrl))); + expect(i, 1000); + }, skip: canStreamResponseBody ? false : 'does not stream response bodies'); + + test('after streaming response', () async { + final request = Request('GET', serverUrl); + + final response = await client.send(request); + await response.stream.drain(); + // Trigger abort, should have no effect. + }); + + test('after response, client still useable', () async { + final request = Request('GET', serverUrl); + + final abortResponse = await client.send(request); + // TODO: Trigger abort + try { + await abortResponse.stream.drain(); + } on ClientException {} + + final response = await client.get(serverUrl); + expect(response.statusCode, 200); + expect(response.body, endsWith('10000\n')); + }); + }); +}