1515use oliverlorenz \reactphpmqtt \packet \Factory ;
1616use oliverlorenz \reactphpmqtt \packet \MessageHelper ;
1717use oliverlorenz \reactphpmqtt \packet \PingRequest ;
18- use oliverlorenz \reactphpmqtt \packet \PingResponse ;
1918use oliverlorenz \reactphpmqtt \packet \Publish ;
20- use oliverlorenz \reactphpmqtt \packet \PublishReceived ;
21- use oliverlorenz \reactphpmqtt \packet \PublishRelease ;
2219use oliverlorenz \reactphpmqtt \packet \Subscribe ;
2320use oliverlorenz \reactphpmqtt \packet \SubscribeAck ;
2421use oliverlorenz \reactphpmqtt \packet \Unsubscribe ;
@@ -88,31 +85,17 @@ private function listenForPackets(Stream $stream)
8885 foreach ($ messages as $ data ) {
8986 try {
9087 $ message = Factory::getByMessage ($ this ->version , $ data );
91- echo " received: \t" . get_class ( $ message) . "\n" ;
88+ $ stream -> emit ( $ message :: EVENT , [ $ message]) ;
9289
93- if ($ message instanceof ConnectionAck) {
94- $ stream ->emit ('CONNECTION_ACK ' , array ($ message ));
95- } elseif ($ message instanceof PingResponse) {
96- $ stream ->emit ('PING_RESPONSE ' , array ($ message ));
97- } elseif ($ message instanceof Publish) {
98- $ stream ->emit ('PUBLISH ' , array ($ message ));
99- } elseif ($ message instanceof PublishReceived) {
100- $ stream ->emit ('PUBLISH_RECEIVED ' , array ($ message ));
101- } elseif ($ message instanceof PublishRelease) {
102- $ stream ->emit ('PUBLISH_RELEASE ' , array ($ message ));
103- } elseif ($ message instanceof UnsubscribeAck) {
104- $ stream ->emit ('UNSUBSCRIBE_ACK ' , array ($ message ));
105- } elseif ($ message instanceof SubscribeAck) {
106- $ stream ->emit ('SUBSCRIBE_ACK ' , array ($ message ));
107- }
90+ echo "received: \t" . get_class ($ message ) . "\n" ;
10891 } catch (\InvalidArgumentException $ ex ) {
10992
11093 }
11194 }
11295 });
11396
11497 $ deferred = new Deferred ();
115- $ stream ->on (' CONNECTION_ACK ' , function ($ message ) use ($ stream , $ deferred ) {
98+ $ stream ->on (ConnectionAck:: EVENT , function ($ message ) use ($ stream , $ deferred ) {
11699 $ deferred ->resolve ($ stream );
117100 });
118101
@@ -183,7 +166,7 @@ public function subscribe(Stream $stream, $topic, $qos = 0)
183166 $ this ->sendPacketToStream ($ stream , $ packet );
184167
185168 $ deferred = new Deferred ();
186- $ stream ->on (' SUBSCRIBE_ACK ' , function ($ message ) use ($ stream , $ deferred ) {
169+ $ stream ->on (SubscribeAck:: EVENT , function ($ message ) use ($ stream , $ deferred ) {
187170 $ deferred ->resolve ($ stream );
188171 });
189172
@@ -202,7 +185,7 @@ public function unsubscribe(Stream $stream, $topic)
202185 $ this ->sendPacketToStream ($ stream , $ packet );
203186
204187 $ deferred = new Deferred ();
205- $ stream ->on (' UNSUBSCRIBE_ACK ' , function ($ message ) use ($ stream , $ deferred ) {
188+ $ stream ->on (UnsubscribeAck:: EVENT , function ($ message ) use ($ stream , $ deferred ) {
206189 $ deferred ->resolve ($ stream );
207190 });
208191
0 commit comments