diff --git a/src/Illuminate/Http/Client/Response.php b/src/Illuminate/Http/Client/Response.php index 27f0899cb517..89eb0695e509 100644 --- a/src/Illuminate/Http/Client/Response.php +++ b/src/Illuminate/Http/Client/Response.php @@ -3,6 +3,7 @@ namespace Illuminate\Http\Client; use ArrayAccess; +use Generator; use GuzzleHttp\Psr7\StreamWrapper; use Illuminate\Support\Collection; use Illuminate\Support\Fluent; @@ -74,6 +75,46 @@ public function body() return (string) $this->response->getBody(); } + /** + * Stream the response body in chunks of the given length. + * + * @param int $chunkLength + * @return \Generator + */ + public function stream(int $chunkLength = 1): Generator + { + $stream = $this->getBody(); + + while (! $stream->eof()) { + yield $stream->read($chunkLength); + } + } + + /** + * Stream the response body line by line, buffering chunks to ensure complete lines. + * + * @param int $chunkLength + * @param string $separator + * @return \Generator + */ + public function streamLines(int $chunkLength = 1, string $separator = "\n"): Generator + { + $buffer = ''; + + foreach ($this->stream($chunkLength) as $chunk) { + $buffer .= $chunk; + + while (($pos = strpos($buffer, $separator)) !== false) { + yield substr($buffer, 0, $pos); + $buffer = substr($buffer, $pos + mb_strlen($separator)); + } + } + + if ($buffer !== '') { + yield $buffer; + } + } + /** * Get the JSON decoded body of the response as an array or scalar value. * diff --git a/tests/Http/HttpClientStreamTest.php b/tests/Http/HttpClientStreamTest.php new file mode 100644 index 000000000000..2af7579b872b --- /dev/null +++ b/tests/Http/HttpClientStreamTest.php @@ -0,0 +1,147 @@ + fn () => Http::response(implode($messages))]); + } + + public function testStreamYieldsOneChunkPerMessageWhenChunkSizeMatchesMessageSize() + { + $stream = Http::withOptions(['stream' => true]) + ->get('http://example.test/stream') + ->stream(chunkLength: 19); + + $expectedMessages = [ + "Streamed Message 1\n", + "Streamed Message 2\n", + "Streamed Message 3\n", + "Streamed Message 4\n", + 'Streamed Message 5', + ]; + + $chunkIndex = 0; + foreach ($stream as $chunk) { + $this->assertSame($expectedMessages[$chunkIndex], $chunk); + $chunkIndex++; + } + + $this->assertSame(count($expectedMessages), $chunkIndex); + } + + public function testStreamYieldsChunksAcrossMessageBoundariesWhenChunkSizeIsSmaller() + { + $stream = Http::withOptions(['stream' => true]) + ->get('http://example.test/stream') + ->stream(chunkLength: 10); + + $expectedMessages = [ + 'Streamed M', + "essage 1\nS", + 'treamed Me', + "ssage 2\nSt", + 'reamed Mes', + "sage 3\nStr", + 'eamed Mess', + "age 4\nStre", + 'amed Messa', + 'ge 5', + ]; + + $chunkIndex = 0; + foreach ($stream as $chunk) { + $this->assertSame($expectedMessages[$chunkIndex], $chunk); + $chunkIndex++; + } + + $this->assertSame(count($expectedMessages), $chunkIndex); + } + + public function testStreamLinesYieldsCompleteLinesRegardlessOfChunkSize() + { + $streams = [ + Http::withOptions(['stream' => true]) + ->get('http://example.test/stream') + ->streamLines(), // reads single bytes + Http::withOptions(['stream' => true]) + ->get('http://example.test/stream') + ->streamLines(chunkLength: 17), // chunk length smaller than line length + Http::withOptions(['stream' => true]) + ->get('http://example.test/stream') + ->streamLines(chunkLength: 27), // chunk length larger than line length + ]; + + $expectedMessages = [ + 'Streamed Message 1', + 'Streamed Message 2', + 'Streamed Message 3', + 'Streamed Message 4', + 'Streamed Message 5', + ]; + + foreach ($streams as $stream) { + $chunkIndex = 0; + foreach ($stream as $chunk) { + $this->assertSame($expectedMessages[$chunkIndex], $chunk); + $chunkIndex++; + } + + $this->assertSame(count($expectedMessages), $chunkIndex); + } + } + + public function testStreamLinesSupportsCustomSeparators() + { + // Test with pipe separator + Http::fake(['http://example.test/pipe' => fn () => Http::response('Line 1|Line 2|Line 3')]); + + $pipeStream = Http::withOptions(['stream' => true]) + ->get('http://example.test/pipe') + ->streamLines(separator: '|'); + + $expectedPipeMessages = ['Line 1', 'Line 2', 'Line 3']; + foreach ($pipeStream as $i => $chunk) { + $this->assertSame($expectedPipeMessages[$i], $chunk); + } + + // Test with comma separator + Http::fake(['http://example.test/comma' => fn () => Http::response('Item1,Item2,Item3')]); + + $commaStream = Http::withOptions(['stream' => true]) + ->get('http://example.test/comma') + ->streamLines(separator: ','); + + $expectedCommaMessages = ['Item1', 'Item2', 'Item3']; + foreach ($commaStream as $i => $chunk) { + $this->assertSame($expectedCommaMessages[$i], $chunk); + } + + // Test with multi-character separator + Http::fake(['http://example.test/multi' => fn () => Http::response('First||Second||Third')]); + + $multiStream = Http::withOptions(['stream' => true]) + ->get('http://example.test/multi') + ->streamLines(separator: '||'); + + $expectedMultiMessages = ['First', 'Second', 'Third']; + foreach ($multiStream as $i => $chunk) { + $this->assertSame($expectedMultiMessages[$i], $chunk); + } + } +}