2222use oliverlorenz \reactphpmqtt \packet \UnsubscribeAck ;
2323use oliverlorenz \reactphpmqtt \protocol \Version ;
2424use oliverlorenz \reactphpmqtt \protocol \Violation as ProtocolViolation ;
25- use React \Dns \Resolver \Resolver ;
26- use React \EventLoop \LoopInterface ;
25+ use React \EventLoop \LoopInterface as Loop ;
2726use React \EventLoop \Timer \Timer ;
2827use React \Promise \Deferred ;
2928use React \Promise \FulfilledPromise ;
3029use React \Promise \PromiseInterface ;
31- use React \SocketClient \ConnectorInterface ;
32- use React \Stream \Stream ;
33-
34- class Connector implements ConnectorInterface {
30+ use React \Socket \ConnectionInterface as Connection ;
31+ use React \Socket \ConnectorInterface as ReactConnector ;
3532
33+ class MqttClient
34+ {
3635 /**
37- * @var $loop LoopInterface
36+ * @var $loop Loop
3837 */
3938 private $ loop ;
40- protected $ socketConnector ;
39+ private $ socketConnector ;
4140 private $ version ;
41+
4242 private $ messageCounter = 1 ;
4343
44- public function __construct (LoopInterface $ loop , Resolver $ resolver , Version $ version )
44+ public function __construct (Loop $ loop , ReactConnector $ connector , Version $ version )
4545 {
4646 $ this ->version = $ version ;
47- $ this ->socketConnector = new \ React \ SocketClient \ Connector ( $ loop , $ resolver ) ;
47+ $ this ->socketConnector = $ connector ;
4848 $ this ->loop = $ loop ;
4949 }
5050
5151 /**
5252 * Creates a new connection
5353 *
54- * @param string $host
55- * @param int $port [optional]
54+ * @param string $uri
5655 * @param ConnectionOptions|null $options [optional]
5756 *
5857 * @return PromiseInterface Resolves to a \React\Stream\Stream once a connection has been established
5958 */
60- public function create (
61- $ host ,
62- $ port = 1883 ,
59+ public function connect (
60+ $ uri ,
6361 ConnectionOptions $ options = null
6462 ) {
6563 // Set default connection options, if none provided
6664 if ($ options == null ) {
6765 $ options = $ this ->getDefaultConnectionOptions ();
6866 }
6967
70- return $ this ->socketConnector ->create ($ host , $ port )
71- ->then (function (Stream $ stream ) use ($ options ) {
72- return $ this ->connect ($ stream , $ options );
73- })
74- ->then (function (Stream $ stream ) {
75- return $ this ->listenForPackets ($ stream );
76- })
77- ->then (function (Stream $ stream ) use ($ options ) {
78- return $ this ->keepAlive ($ stream , $ options ->keepAlive );
79- });
68+ $ promise = $ this ->socketConnector ->connect ($ uri );
69+ $ promise ->then (function (Connection $ stream ) {
70+ $ this ->listenForPackets ($ stream );
71+ });
72+ $ connection = $ promise ->then (function (Connection $ stream ) use ($ options ) {
73+ return $ this ->sendConnectPacket ($ stream , $ options );
74+ });
75+ $ connection ->then (function (Connection $ stream ) use ($ options ) {
76+ return $ this ->keepAlive ($ stream , $ options ->keepAlive );
77+ });
78+
79+ return $ connection ;
8080 }
8181
82- private function listenForPackets (Stream $ stream )
82+ private function listenForPackets (Connection $ stream )
8383 {
84- $ stream ->on ('data ' , function ($ rawData ) use ($ stream ) {
84+ $ stream ->on ('data ' , function ($ rawData ) use ($ stream ) {
8585 try {
8686 foreach (Factory::getNextPacket ($ this ->version , $ rawData ) as $ packet ) {
8787 $ stream ->emit ($ packet ::EVENT , [$ packet ]);
@@ -93,16 +93,16 @@ private function listenForPackets(Stream $stream)
9393 $ stream ->emit ('INVALID ' , [$ e ]);
9494 }
9595 });
96-
97- $ deferred = new Deferred ();
98- $ stream ->on (ConnectionAck::EVENT , function ($ message ) use ($ stream , $ deferred ) {
99- $ deferred ->resolve ($ stream );
100- });
101-
102- return $ deferred ->promise ();
96+ //
97+ // $deferred = new Deferred();
98+ // $stream->on(ConnectionAck::EVENT, function($message) use ($stream, $deferred) {
99+ // $deferred->resolve($stream);
100+ // });
101+ //
102+ // return $deferred->promise();
103103 }
104104
105- private function keepAlive (Stream $ stream , $ keepAlive )
105+ private function keepAlive (Connection $ stream , $ keepAlive )
106106 {
107107 if ($ keepAlive > 0 ) {
108108 $ interval = $ keepAlive / 2 ;
@@ -119,7 +119,7 @@ private function keepAlive(Stream $stream, $keepAlive)
119119 /**
120120 * @return \React\Promise\Promise
121121 */
122- public function connect ( Stream $ stream , ConnectionOptions $ options ) {
122+ public function sendConnectPacket ( Connection $ stream , ConnectionOptions $ options ) {
123123 $ packet = new Connect (
124124 $ this ->version ,
125125 $ options ->username ,
@@ -136,16 +136,22 @@ public function connect(Stream $stream, ConnectionOptions $options) {
136136 echo MessageHelper::getReadableByRawString ($ message );
137137
138138 $ deferred = new Deferred ();
139- if ( $ stream ->write ( $ message )) {
139+ $ stream ->on (ConnectionAck:: EVENT , function ( $ message ) use ( $ stream , $ deferred ) {
140140 $ deferred ->resolve ($ stream );
141- } else {
142- $ deferred ->reject ();
143- }
141+ });
142+
143+ $ stream ->write ($ message );
144+ // $deferred = new Deferred();
145+ // if ($stream->write($message)) {
146+ // $deferred->resolve($stream);
147+ // } else {
148+ // $deferred->reject();
149+ // }
144150
145151 return $ deferred ->promise ();
146152 }
147153
148- private function sendPacketToStream (Stream $ stream , ControlPacket $ controlPacket )
154+ private function sendPacketToStream (Connection $ stream , ControlPacket $ controlPacket )
149155 {
150156 echo "send: \t\t" . get_class ($ controlPacket ) . "\n" ;
151157 $ message = $ controlPacket ->get ();
@@ -154,12 +160,12 @@ private function sendPacketToStream(Stream $stream, ControlPacket $controlPacket
154160 }
155161
156162 /**
157- * @param Stream $stream
163+ * @param Connection $stream
158164 * @param string $topic
159165 * @param int $qos
160166 * @return \React\Promise\Promise
161167 */
162- public function subscribe (Stream $ stream , $ topic , $ qos = 0 )
168+ public function subscribe (Connection $ stream , $ topic , $ qos = 0 )
163169 {
164170 $ packet = new Subscribe ($ this ->version );
165171 $ packet ->addSubscription ($ topic , $ qos );
@@ -174,11 +180,11 @@ public function subscribe(Stream $stream, $topic, $qos = 0)
174180 }
175181
176182 /**
177- * @param Stream $stream
183+ * @param Connection $stream
178184 * @param string $topic
179185 * @return \React\Promise\Promise
180186 */
181- public function unsubscribe (Stream $ stream , $ topic )
187+ public function unsubscribe (Connection $ stream , $ topic )
182188 {
183189 $ packet = new Unsubscribe ($ this ->version );
184190 $ packet ->removeSubscription ($ topic );
@@ -192,7 +198,7 @@ public function unsubscribe(Stream $stream, $topic)
192198 return $ deferred ->promise ();
193199 }
194200
195- public function disconnect (Stream $ stream )
201+ public function disconnect (Connection $ stream )
196202 {
197203 $ packet = new Disconnect ($ this ->version );
198204 $ this ->sendPacketToStream ($ stream , $ packet );
@@ -204,7 +210,7 @@ public function disconnect(Stream $stream)
204210 /**
205211 * @return \React\Promise\Promise
206212 */
207- public function publish (Stream $ stream , $ topic , $ message , $ qos = 0 , $ dup = false , $ retain = false )
213+ public function publish (Connection $ stream , $ topic , $ message , $ qos = 0 , $ dup = false , $ retain = false )
208214 {
209215 $ packet = new Publish ($ this ->version );
210216 $ packet ->setTopic ($ topic );
@@ -227,7 +233,7 @@ public function publish(Stream $stream, $topic, $message, $qos = 0, $dup = false
227233 }
228234
229235 /**
230- * @return LoopInterface
236+ * @return Loop
231237 */
232238 public function getLoop ()
233239 {
0 commit comments