Skip to content

Commit 41afe92

Browse files
committed
Merge branch 'master' into update-dependencies
2 parents 1bce0b2 + 44ff9ba commit 41afe92

20 files changed

+300
-251
lines changed

README.md

Lines changed: 24 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,68 @@
11
# PHP MQTT Client
22

3-
phpMqttClient is a mqtt client library for PHP. Its based on the reactPHP socket-client and added the mqtt protocol specific functions. I hope its a better starting point that the existing php mqtt libraries.
3+
phpMqttClient is an MQTT client library for PHP. Its based on the reactPHP socket-client and added the MQTT protocol
4+
specific functions. I hope its a better starting point that the existing PHP MQTT libraries.
45

56
[![Build Status](https://travis-ci.org/oliverlorenz/phpMqttClient.svg?branch=master)](https://travis-ci.org/oliverlorenz/phpMqttClient)
67
[![Code Climate](https://codeclimate.com/github/oliverlorenz/phpMqttClient/badges/gpa.svg)](https://codeclimate.com/github/oliverlorenz/phpMqttClient)
78
[![Test Coverage](https://codeclimate.com/github/oliverlorenz/phpMqttClient/badges/coverage.svg)](https://codeclimate.com/github/oliverlorenz/phpMqttClient/coverage)
89

9-
### Notice - (May 12th, 2015)
10-
This is library is not stable currently. Its an early state, but I am working on it. I will add more features if I need them. If you need features: please give feedback or contribute to get this library running.
11-
12-
Currently works:
13-
* connect (clean session, no other connect flags)
14-
* disconnect
15-
* publish
16-
* subscribe
17-
1810
## Goal
1911

20-
Goal of this project is easy to use MQTT client for PHP in a modern architecture without using any php modules. Currently, only protocol version 4 (mqtt 3.1.1) is implemented.
12+
Goal of this project is easy to use MQTT client for PHP in a modern architecture without using any php modules.
13+
Currently, only protocol version 4 (mqtt 3.1.1) is implemented.
2114
* Protocol specifications: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/csprd02/mqtt-v3.1.1-csprd02.html
2215

2316
## Example publish
2417

2518
```php
26-
<?php
27-
28-
require __DIR__ . '/../vendor/autoload.php';
29-
30-
$config = include('config.php');
19+
$config = require 'config.php';
3120

32-
$loop = React\EventLoop\Factory::create();
33-
34-
$dnsResolverFactory = new React\Dns\Resolver\Factory();
35-
$resolver = $dnsResolverFactory->createCached('8.8.8.8', $loop);
36-
37-
$version = new oliverlorenz\reactphpmqtt\protocol\Version4();
38-
$connector = new oliverlorenz\reactphpmqtt\Connector($loop, $resolver, $version);
21+
$connector = ClientFactory::createClient(new Version4());
3922

4023
$p = $connector->create($config['server'], $config['port'], $config['options']);
41-
$p->then(function(\React\Stream\Stream $stream) use ($connector) {
24+
$p->then(function(Stream $stream) use ($connector) {
4225
return $connector->publish($stream, 'a/b', 'example message');
4326
});
44-
$loop->run();
27+
$connector->getLoop()->run();
4528
```
4629

4730
## Example subscribe
4831

4932
```php
50-
<?php
33+
$config = require 'config.php';
5134

52-
require __DIR__ . '/../vendor/autoload.php';
53-
54-
$config = include('config.php');
55-
56-
$loop = React\EventLoop\Factory::create();
57-
58-
$dnsResolverFactory = new React\Dns\Resolver\Factory();
59-
$resolver = $dnsResolverFactory->createCached('8.8.8.8', $loop);
60-
61-
$version = new oliverlorenz\reactphpmqtt\protocol\Version4();
62-
$connector = new oliverlorenz\reactphpmqtt\Connector($loop, $resolver, $version);
35+
$connector = ClientFactory::createClient(new Version4());
6336

6437
$p = $connector->create($config['server'], $config['port'], $config['options']);
6538
$p->then(function(\React\Stream\Stream $stream) use ($connector) {
66-
$stream->on('PUBLISH', function(\oliverlorenz\reactphpmqtt\packet\Publish $message) {
39+
$stream->on(Publish::EVENT, function(Publish $message) {
6740
print_r($message);
6841
});
6942

7043
$connector->subscribe($stream, 'a/b', 0);
7144
$connector->subscribe($stream, 'a/c', 0);
7245
});
7346

74-
$loop->run();
47+
$connector->getLoop()->run();
7548
```
7649

77-
# Run tests
50+
## Notice - (May 12th, 2015)
51+
This is library is not stable currently. Its an early state, but I am working on it. I will add more features if I need them. If you need features: please give feedback or contribute to get this library running.
52+
53+
Currently works:
54+
* connect (clean session, no other connect flags)
55+
* disconnect
56+
* publish
57+
* subscribe
58+
59+
## Run tests
7860

7961
./vendor/bin/phpunit -c ./tests/phpunit.xml ./tests
8062

8163

82-
# Troubleshooting
64+
## Troubleshooting
8365

84-
## Why does the connect to localhost:1883 not work?
66+
### Why does the connect to localhost:1883 not work?
8567

8668
The answer is simple: In the example is the DNS 8.8.8.8 configured. Your local server is not visible for them, so you can't connect.

examples/config.php.example

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
$config = array(
44
'server' => 'yourMqttBroker.tld',
55
'port' => 1883,
6-
'options' => null,
6+
'options' => new \oliverlorenz\reactphpmqtt\packet\ConnectionOptions(array(
7+
'keepAlive' => 120,
8+
)),
79
);
810

911
return $config;

examples/connectPublish.php

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,18 @@
11
<?php
22

3-
require __DIR__ . '/../vendor/autoload.php';
4-
5-
$config = include('config.php');
3+
use oliverlorenz\reactphpmqtt\ClientFactory;
4+
use oliverlorenz\reactphpmqtt\protocol\Version4;
5+
use React\Stream\Stream;
66

7-
$loop = React\EventLoop\Factory::create();
7+
require __DIR__ . '/../vendor/autoload.php';
88

9-
$dnsResolverFactory = new React\Dns\Resolver\Factory();
10-
$resolver = $dnsResolverFactory->createCached('8.8.8.8', $loop);
9+
$config = require 'config.php';
1110

12-
$version = new oliverlorenz\reactphpmqtt\protocol\Version4();
13-
$connector = new oliverlorenz\reactphpmqtt\Connector($loop, $resolver, $version);
11+
$connector = ClientFactory::createClient(new Version4(), '8.8.8.8');
1412

1513
$p = $connector->create($config['server'], $config['port'], $config['options']);
16-
$p->then(function(\React\Stream\Stream $stream) use ($connector) {
14+
$p->then(function(Stream $stream) use ($connector) {
1715
return $connector->publish($stream, 'hello/world', 'example message');
1816
});
1917

20-
$loop->run();
18+
$connector->getLoop()->run();

examples/connectSubscribe.php

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,19 @@
11
<?php
22

3-
require __DIR__ . '/../vendor/autoload.php';
4-
5-
$config = include('config.php');
3+
use oliverlorenz\reactphpmqtt\ClientFactory;
4+
use oliverlorenz\reactphpmqtt\packet\Publish;
5+
use oliverlorenz\reactphpmqtt\protocol\Version4;
6+
use React\Stream\Stream;
67

7-
$loop = React\EventLoop\Factory::create();
8+
require __DIR__ . '/../vendor/autoload.php';
89

9-
$dnsResolverFactory = new React\Dns\Resolver\Factory();
10-
$resolver = $dnsResolverFactory->createCached('8.8.8.8', $loop);
10+
$config = require 'config.php';
1111

12-
$version = new oliverlorenz\reactphpmqtt\protocol\Version4();
13-
$connector = new oliverlorenz\reactphpmqtt\Connector($loop, $resolver, $version);
12+
$connector = ClientFactory::createClient(new Version4(), '8.8.8.8');
1413

1514
$p = $connector->create($config['server'], $config['port'], $config['options']);
16-
$p->then(function(\React\Stream\Stream $stream) use ($connector) {
17-
$stream->on('PUBLISH', function(\oliverlorenz\reactphpmqtt\packet\Publish $message) {
15+
$p->then(function(Stream $stream) use ($connector) {
16+
$stream->on(Publish::EVENT, function(Publish $message) {
1817
printf(
1918
'Received payload "%s" for topic "%s"%s',
2019
$message->getPayload(),
@@ -26,4 +25,4 @@
2625
return $connector->subscribe($stream, 'hello/world', 0);
2726
});
2827

29-
$loop->run();
28+
$connector->getLoop()->run();

src/ClientFactory.php

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?php
2+
3+
namespace oliverlorenz\reactphpmqtt;
4+
5+
use oliverlorenz\reactphpmqtt\protocol\Version;
6+
use React\Dns\Resolver\Factory as DnsResolverFactory;
7+
use React\EventLoop\Factory as EventLoopFactory;
8+
9+
class ClientFactory
10+
{
11+
public static function createClient(Version $version, $resolverIp = '8.8.8.8')
12+
{
13+
$loop = EventLoopFactory::create();
14+
$resolver = self::createDnsResolver($resolverIp, $loop);
15+
16+
return new Connector($loop, $resolver, $version);
17+
}
18+
19+
public static function createSecureClient(Version $version, $resolverIp = '8.8.8.8')
20+
{
21+
$loop = EventLoopFactory::create();
22+
$resolver = self::createDnsResolver($resolverIp, $loop);
23+
24+
return new SecureConnector($loop, $resolver, $version);
25+
}
26+
27+
private static function createDnsResolver($resolverIp, $loop)
28+
{
29+
$dnsResolverFactory = new DnsResolverFactory();
30+
31+
return $dnsResolverFactory->createCached($resolverIp, $loop);
32+
}
33+
}

src/Connector.php

Lines changed: 25 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,13 @@
1515
use oliverlorenz\reactphpmqtt\packet\Factory;
1616
use oliverlorenz\reactphpmqtt\packet\MessageHelper;
1717
use oliverlorenz\reactphpmqtt\packet\PingRequest;
18-
use oliverlorenz\reactphpmqtt\packet\PingResponse;
1918
use oliverlorenz\reactphpmqtt\packet\Publish;
20-
use oliverlorenz\reactphpmqtt\packet\PublishReceived;
21-
use oliverlorenz\reactphpmqtt\packet\PublishRelease;
2219
use oliverlorenz\reactphpmqtt\packet\Subscribe;
2320
use oliverlorenz\reactphpmqtt\packet\SubscribeAck;
2421
use oliverlorenz\reactphpmqtt\packet\Unsubscribe;
2522
use oliverlorenz\reactphpmqtt\packet\UnsubscribeAck;
2623
use oliverlorenz\reactphpmqtt\protocol\Version;
24+
use oliverlorenz\reactphpmqtt\protocol\Violation as ProtocolViolation;
2725
use React\Dns\Resolver\Resolver;
2826
use React\EventLoop\LoopInterface;
2927
use React\EventLoop\Timer\Timer;
@@ -76,55 +74,44 @@ public function create(
7674
->then(function (Stream $stream) {
7775
return $this->listenForPackets($stream);
7876
})
79-
->then(function(Stream $stream) {
80-
return $this->keepAlive($stream);
77+
->then(function(Stream $stream) use ($options) {
78+
return $this->keepAlive($stream, $options->keepAlive);
8179
});
8280
}
8381

8482
private function listenForPackets(Stream $stream)
8583
{
8684
$stream->on('data', function ($rawData) use ($stream) {
87-
$messages = $this->splitMessage($rawData);
88-
foreach ($messages as $data) {
89-
try {
90-
$message = Factory::getByMessage($this->version, $data);
91-
echo "received:\t" . get_class($message) . "\n";
92-
93-
if ($message instanceof ConnectionAck) {
94-
$stream->emit('CONNECTION_ACK', array($message));
95-
} elseif ($message instanceof PingResponse) {
96-
$stream->emit('PING_RESPONSE', array($message));
97-
} elseif ($message instanceof Publish) {
98-
$stream->emit('PUBLISH', array($message));
99-
} elseif ($message instanceof PublishReceived) {
100-
$stream->emit('PUBLISH_RECEIVED', array($message));
101-
} elseif ($message instanceof PublishRelease) {
102-
$stream->emit('PUBLISH_RELEASE', array($message));
103-
} elseif ($message instanceof UnsubscribeAck) {
104-
$stream->emit('UNSUBSCRIBE_ACK', array($message));
105-
} elseif ($message instanceof SubscribeAck) {
106-
$stream->emit('SUBSCRIBE_ACK', array($message));
107-
}
108-
} catch (\InvalidArgumentException $ex) {
109-
85+
try {
86+
foreach (Factory::getNextPacket($this->version, $rawData) as $packet) {
87+
$stream->emit($packet::EVENT, [$packet]);
88+
echo "received:\t" . get_class($packet) . PHP_EOL;
11089
}
11190
}
91+
catch (ProtocolViolation $e) {
92+
//TODO Actually, the spec says to disconnect if you receive invalid data.
93+
$stream->emit('INVALID', [$e]);
94+
}
11295
});
11396

11497
$deferred = new Deferred();
115-
$stream->on('CONNECTION_ACK', function($message) use ($stream, $deferred) {
98+
$stream->on(ConnectionAck::EVENT, function($message) use ($stream, $deferred) {
11699
$deferred->resolve($stream);
117100
});
118101

119102
return $deferred->promise();
120103
}
121104

122-
private function keepAlive(Stream $stream)
105+
private function keepAlive(Stream $stream, $keepAlive)
123106
{
124-
$this->getLoop()->addPeriodicTimer(10, function(Timer $timer) use ($stream) {
125-
$packet = new PingRequest($this->version);
126-
$this->sendPacketToStream($stream, $packet);
127-
});
107+
if($keepAlive > 0) {
108+
$interval = $keepAlive / 2;
109+
110+
$this->getLoop()->addPeriodicTimer($interval, function(Timer $timer) use ($stream) {
111+
$packet = new PingRequest($this->version);
112+
$this->sendPacketToStream($stream, $packet);
113+
});
114+
}
128115

129116
return new FulfilledPromise($stream);
130117
}
@@ -142,7 +129,8 @@ public function connect(Stream $stream, ConnectionOptions $options) {
142129
$options->willTopic,
143130
$options->willMessage,
144131
$options->willQos,
145-
$options->willRetain
132+
$options->willRetain,
133+
$options->keepAlive
146134
);
147135
$message = $packet->get();
148136
echo MessageHelper::getReadableByRawString($message);
@@ -178,7 +166,7 @@ public function subscribe(Stream $stream, $topic, $qos = 0)
178166
$this->sendPacketToStream($stream, $packet);
179167

180168
$deferred = new Deferred();
181-
$stream->on('SUBSCRIBE_ACK', function($message) use ($stream, $deferred) {
169+
$stream->on(SubscribeAck::EVENT, function($message) use ($stream, $deferred) {
182170
$deferred->resolve($stream);
183171
});
184172

@@ -197,7 +185,7 @@ public function unsubscribe(Stream $stream, $topic)
197185
$this->sendPacketToStream($stream, $packet);
198186

199187
$deferred = new Deferred();
200-
$stream->on('UNSUBSCRIBE_ACK', function($message) use ($stream, $deferred) {
188+
$stream->on(UnsubscribeAck::EVENT, function($message) use ($stream, $deferred) {
201189
$deferred->resolve($stream);
202190
});
203191

@@ -238,24 +226,6 @@ public function publish(Stream $stream, $topic, $message, $qos = 0, $dup = false
238226
return $deferred->promise();
239227
}
240228

241-
private function splitMessage($data)
242-
{
243-
$messages = array();
244-
while(true) {
245-
if (isset($data{1})) {
246-
$length = ord($data{1});
247-
$messages[] = substr($data, 0, $length + 2);
248-
$data = substr($data, $length + 2);
249-
}
250-
251-
if (empty($data)) {
252-
break;
253-
}
254-
}
255-
256-
return $messages;
257-
}
258-
259229
/**
260230
* @return LoopInterface
261231
*/

0 commit comments

Comments
 (0)