44
55use Evenement \EventEmitter ;
66use Psr \Http \Message \RequestInterface ;
7+ use Psr \Http \Message \ResponseInterface ;
8+ use React \Http \Message \Response ;
79use React \Promise ;
810use React \Socket \ConnectionInterface ;
911use React \Socket \ConnectorInterface ;
1416 * @event response
1517 * @event drain
1618 * @event error
17- * @event end
19+ * @event close
1820 * @internal
1921 */
2022class ClientRequestStream extends EventEmitter implements WritableStreamInterface
@@ -31,9 +33,11 @@ class ClientRequestStream extends EventEmitter implements WritableStreamInterfac
3133 private $ request ;
3234
3335 /** @var ?ConnectionInterface */
34- private $ stream ;
36+ private $ connection ;
37+
38+ /** @var string */
39+ private $ buffer = '' ;
3540
36- private $ buffer ;
3741 private $ responseFactory ;
3842 private $ state = self ::STATE_INIT ;
3943 private $ ended = false ;
@@ -56,22 +60,22 @@ private function writeHead()
5660 $ this ->state = self ::STATE_WRITING_HEAD ;
5761
5862 $ request = $ this ->request ;
59- $ streamRef = &$ this ->stream ;
63+ $ connectionRef = &$ this ->connection ;
6064 $ stateRef = &$ this ->state ;
6165 $ pendingWrites = &$ this ->pendingWrites ;
6266 $ that = $ this ;
6367
6468 $ promise = $ this ->connect ();
6569 $ promise ->then (
66- function (ConnectionInterface $ stream ) use ($ request , &$ streamRef , &$ stateRef , &$ pendingWrites , $ that ) {
67- $ streamRef = $ stream ;
68- assert ($ streamRef instanceof ConnectionInterface);
70+ function (ConnectionInterface $ connection ) use ($ request , &$ connectionRef , &$ stateRef , &$ pendingWrites , $ that ) {
71+ $ connectionRef = $ connection ;
72+ assert ($ connectionRef instanceof ConnectionInterface);
6973
70- $ stream ->on ('drain ' , array ($ that , 'handleDrain ' ));
71- $ stream ->on ('data ' , array ($ that , 'handleData ' ));
72- $ stream ->on ('end ' , array ($ that , 'handleEnd ' ));
73- $ stream ->on ('error ' , array ($ that , 'handleError ' ));
74- $ stream ->on ('close ' , array ($ that , 'handleClose ' ));
74+ $ connection ->on ('drain ' , array ($ that , 'handleDrain ' ));
75+ $ connection ->on ('data ' , array ($ that , 'handleData ' ));
76+ $ connection ->on ('end ' , array ($ that , 'handleEnd ' ));
77+ $ connection ->on ('error ' , array ($ that , 'handleError ' ));
78+ $ connection ->on ('close ' , array ($ that , 'close ' ));
7579
7680 assert ($ request instanceof RequestInterface);
7781 $ headers = "{$ request ->getMethod ()} {$ request ->getRequestTarget ()} HTTP/ {$ request ->getProtocolVersion ()}\r\n" ;
@@ -81,7 +85,7 @@ function (ConnectionInterface $stream) use ($request, &$streamRef, &$stateRef, &
8185 }
8286 }
8387
84- $ more = $ stream ->write ($ headers . "\r\n" . $ pendingWrites );
88+ $ more = $ connection ->write ($ headers . "\r\n" . $ pendingWrites );
8589
8690 assert ($ stateRef === ClientRequestStream::STATE_WRITING_HEAD );
8791 $ stateRef = ClientRequestStream::STATE_HEAD_WRITTEN ;
@@ -111,7 +115,7 @@ public function write($data)
111115
112116 // write directly to connection stream if already available
113117 if (self ::STATE_HEAD_WRITTEN <= $ this ->state ) {
114- return $ this ->stream ->write ($ data );
118+ return $ this ->connection ->write ($ data );
115119 }
116120
117121 // otherwise buffer and try to establish connection
@@ -155,26 +159,50 @@ public function handleData($data)
155159 $ response = gPsr \parse_response ($ this ->buffer );
156160 $ bodyChunk = (string ) $ response ->getBody ();
157161 } catch (\InvalidArgumentException $ exception ) {
158- $ this ->emit ('error ' , array ($ exception ));
159- }
160-
161- $ this ->buffer = null ;
162-
163- $ this ->stream ->removeListener ('drain ' , array ($ this , 'handleDrain ' ));
164- $ this ->stream ->removeListener ('data ' , array ($ this , 'handleData ' ));
165- $ this ->stream ->removeListener ('end ' , array ($ this , 'handleEnd ' ));
166- $ this ->stream ->removeListener ('error ' , array ($ this , 'handleError ' ));
167- $ this ->stream ->removeListener ('close ' , array ($ this , 'handleClose ' ));
168-
169- if (!isset ($ response )) {
162+ $ this ->closeError ($ exception );
170163 return ;
171164 }
172165
173- $ this ->stream ->on ('close ' , array ($ this , 'handleClose ' ));
166+ // response headers successfully received => remove listeners for connection events
167+ $ connection = $ this ->connection ;
168+ assert ($ connection instanceof ConnectionInterface);
169+ $ connection ->removeListener ('drain ' , array ($ this , 'handleDrain ' ));
170+ $ connection ->removeListener ('data ' , array ($ this , 'handleData ' ));
171+ $ connection ->removeListener ('end ' , array ($ this , 'handleEnd ' ));
172+ $ connection ->removeListener ('error ' , array ($ this , 'handleError ' ));
173+ $ connection ->removeListener ('close ' , array ($ this , 'close ' ));
174+ $ this ->connection = null ;
175+ $ this ->buffer = '' ;
176+
177+ // take control over connection handling and close connection once response body closes
178+ $ that = $ this ;
179+ $ input = $ body = new CloseProtectionStream ($ connection );
180+ $ input ->on ('close ' , function () use ($ connection , $ that ) {
181+ $ connection ->close ();
182+ $ that ->close ();
183+ });
184+
185+ // determine length of response body
186+ $ length = null ;
187+ $ code = $ response ->getStatusCode ();
188+ if ($ this ->request ->getMethod () === 'HEAD ' || ($ code >= 100 && $ code < 200 ) || $ code == Response::STATUS_NO_CONTENT || $ code == Response::STATUS_NOT_MODIFIED ) {
189+ $ length = 0 ;
190+ } elseif (\strtolower ($ response ->getHeaderLine ('Transfer-Encoding ' )) === 'chunked ' ) {
191+ $ body = new ChunkedDecoder ($ body );
192+ } elseif ($ response ->hasHeader ('Content-Length ' )) {
193+ $ length = (int ) $ response ->getHeaderLine ('Content-Length ' );
194+ }
195+ $ response = $ response ->withBody ($ body = new ReadableBodyStream ($ body , $ length ));
174196
175- $ this ->emit ('response ' , array ($ response , $ this ->stream ));
197+ // emit response with streaming response body (see `Sender`)
198+ $ this ->emit ('response ' , array ($ response , $ body ));
176199
177- $ this ->stream ->emit ('data ' , array ($ bodyChunk ));
200+ // re-emit HTTP response body to trigger body parsing if parts of it are buffered
201+ if ($ bodyChunk !== '' ) {
202+ $ input ->handleData ($ bodyChunk );
203+ } elseif ($ length === 0 ) {
204+ $ input ->handleEnd ();
205+ }
178206 }
179207 }
180208
@@ -196,12 +224,6 @@ public function handleError(\Exception $error)
196224 ));
197225 }
198226
199- /** @internal */
200- public function handleClose ()
201- {
202- $ this ->close ();
203- }
204-
205227 /** @internal */
206228 public function closeError (\Exception $ error )
207229 {
@@ -220,9 +242,11 @@ public function close()
220242
221243 $ this ->state = self ::STATE_END ;
222244 $ this ->pendingWrites = '' ;
245+ $ this ->buffer = '' ;
223246
224- if ($ this ->stream ) {
225- $ this ->stream ->close ();
247+ if ($ this ->connection instanceof ConnectionInterface) {
248+ $ this ->connection ->close ();
249+ $ this ->connection = null ;
226250 }
227251
228252 $ this ->emit ('close ' );
0 commit comments