88use React \EventLoop \LoopInterface ;
99use React \Http \Message \Response ;
1010use React \Http \Message \ServerRequest ;
11+ use React \Http \Middleware \InactiveConnectionTimeoutMiddleware ;
1112use React \Promise ;
1213use React \Promise \PromiseInterface ;
1314use React \Socket \ConnectionInterface ;
@@ -86,6 +87,8 @@ final class StreamingServer extends EventEmitter
8687
8788 /** @var Clock */
8889 private $ clock ;
90+ private $ loop ;
91+ private $ idleConnectionTimeout ;
8992
9093 /**
9194 * Creates an HTTP server that invokes the given callback for each incoming HTTP request
@@ -97,14 +100,18 @@ final class StreamingServer extends EventEmitter
97100 *
98101 * @param LoopInterface $loop
99102 * @param callable $requestHandler
103+ * @param float $idleConnectTimeout
100104 * @see self::listen()
101105 */
102- public function __construct (LoopInterface $ loop , $ requestHandler )
106+ public function __construct (LoopInterface $ loop , $ requestHandler, $ idleConnectTimeout = InactiveConnectionTimeoutMiddleware:: DEFAULT_TIMEOUT )
103107 {
104108 if (!\is_callable ($ requestHandler )) {
105109 throw new \InvalidArgumentException ('Invalid request handler given ' );
106110 }
107111
112+ $ this ->loop = $ loop ;
113+ $ this ->idleConnectionTimeout = $ idleConnectTimeout ;
114+
108115 $ this ->callback = $ requestHandler ;
109116 $ this ->clock = new Clock ($ loop );
110117 $ this ->parser = new RequestHeaderParser ($ this ->clock );
@@ -134,7 +141,27 @@ public function __construct(LoopInterface $loop, $requestHandler)
134141 */
135142 public function listen (ServerInterface $ socket )
136143 {
137- $ socket ->on ('connection ' , array ($ this ->parser , 'handle ' ));
144+ $ socket ->on ('connection ' , array ($ this , 'handle ' ));
145+ }
146+
147+ /** @internal */
148+ public function handle (ConnectionInterface $ conn )
149+ {
150+ $ timer = $ this ->loop ->addTimer ($ this ->idleConnectionTimeout , function () use ($ conn ) {
151+ $ conn ->close ();
152+ });
153+ $ loop = $ this ->loop ;
154+ $ conn ->once ('data ' , function () use ($ loop , $ timer ) {
155+ $ loop ->cancelTimer ($ timer );
156+ });
157+ $ conn ->on ('end ' , function () use ($ loop , $ timer ) {
158+ $ loop ->cancelTimer ($ timer );
159+ });
160+ $ conn ->on ('close ' , function () use ($ loop , $ timer ) {
161+ $ loop ->cancelTimer ($ timer );
162+ });
163+
164+ $ this ->parser ->handle ($ conn );
138165 }
139166
140167 /** @internal */
@@ -352,7 +379,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
352379
353380 // either wait for next request over persistent connection or end connection
354381 if ($ persist ) {
355- $ this ->parser -> handle ($ connection );
382+ $ this ->handle ($ connection );
356383 } else {
357384 $ connection ->end ();
358385 }
@@ -373,10 +400,10 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
373400 // write streaming body and then wait for next request over persistent connection
374401 if ($ persist ) {
375402 $ body ->pipe ($ connection , array ('end ' => false ));
376- $ parser = $ this -> parser ;
377- $ body ->on ('end ' , function () use ($ connection , $ parser , $ body ) {
403+ $ that = $ this ;
404+ $ body ->on ('end ' , function () use ($ connection , $ that , $ body ) {
378405 $ connection ->removeListener ('close ' , array ($ body , 'close ' ));
379- $ parser ->handle ($ connection );
406+ $ that ->handle ($ connection );
380407 });
381408 } else {
382409 $ body ->pipe ($ connection );
0 commit comments