Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
}
},
"require": {
"react/socket": "^1.1"
"react/socket": "^1.1",
"psr/log": "^1.1"
},
"require-dev": {
"phpunit/phpunit": "^5.0.0"
Expand Down
9 changes: 5 additions & 4 deletions src/ClientFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace oliverlorenz\reactphpmqtt;

use oliverlorenz\reactphpmqtt\protocol\Version;
use Psr\Log\LoggerInterface;
use React\Dns\Resolver\Factory as DnsResolverFactory;
use React\EventLoop\Factory as EventLoopFactory;
use React\Socket\DnsConnector;
Expand All @@ -11,21 +12,21 @@

class ClientFactory
{
public static function createClient(Version $version, $resolverIp = '8.8.8.8')
public static function createClient(Version $version, $resolverIp = '8.8.8.8', LoggerInterface $logger = null)
{
$loop = EventLoopFactory::create();
$connector = self::createDnsConnector($resolverIp, $loop);

return new MqttClient($loop, $connector, $version);
return new MqttClient($loop, $connector, $version, $logger);
}

public static function createSecureClient(Version $version, $resolverIp = '8.8.8.8')
public static function createSecureClient(Version $version, $resolverIp = '8.8.8.8', LoggerInterface $logger = null)
{
$loop = EventLoopFactory::create();
$connector = self::createDnsConnector($resolverIp, $loop);
$secureConnector = new SecureConnector($connector, $loop);

return new MqttClient($loop, $secureConnector, $version);
return new MqttClient($loop, $secureConnector, $version, $logger);
}

private static function createDnsConnector($resolverIp, $loop)
Expand Down
26 changes: 22 additions & 4 deletions src/MqttClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use oliverlorenz\reactphpmqtt\packet\UnsubscribeAck;
use oliverlorenz\reactphpmqtt\protocol\Version;
use oliverlorenz\reactphpmqtt\protocol\Violation as ProtocolViolation;
use Psr\Log\LoggerInterface;
use React\EventLoop\LoopInterface as Loop;
use React\EventLoop\Timer\Timer;
use React\Promise\Deferred;
Expand All @@ -38,14 +39,24 @@ class MqttClient
private $loop;
private $socketConnector;
private $version;
private $logger;

private $messageCounter = 1;

public function __construct(Loop $loop, ReactConnector $connector, Version $version)
public function __construct(Loop $loop, ReactConnector $connector, Version $version, LoggerInterface $logger = null)
{
$this->version = $version;
$this->socketConnector = $connector;
$this->loop = $loop;
$this->logger = $logger;
}

/**
* @param LoggerInterface|null $logger
*/
public function setLogger(LoggerInterface $logger = null)
{
$this->logger = $logger;
}

/**
Expand Down Expand Up @@ -85,7 +96,7 @@ private function listenForPackets(Connection $stream)
try {
foreach (Factory::getNextPacket($this->version, $rawData) as $packet) {
$stream->emit($packet::EVENT, [$packet]);
echo "received:\t" . get_class($packet) . PHP_EOL;
$this->debug("received:\t" . get_class($packet));
}
}
catch (ProtocolViolation $e) {
Expand Down Expand Up @@ -133,7 +144,7 @@ public function sendConnectPacket(Connection $stream, ConnectionOptions $options
$options->keepAlive
);
$message = $packet->get();
echo MessageHelper::getReadableByRawString($message);
$this->debug(MessageHelper::getReadableByRawString($message));

$deferred = new Deferred();
$stream->on(ConnectionAck::EVENT, function($message) use ($stream, $deferred) {
Expand All @@ -153,7 +164,8 @@ public function sendConnectPacket(Connection $stream, ConnectionOptions $options

private function sendPacketToStream(Connection $stream, ControlPacket $controlPacket)
{
echo "send:\t\t" . get_class($controlPacket) . "\n";
$this->debug("send:\t\t" . get_class($controlPacket));

$message = $controlPacket->get();

return $stream->write($message);
Expand Down Expand Up @@ -249,4 +261,10 @@ private function getDefaultConnectionOptions()
{
return new ConnectionOptions();
}

private function debug($message) {
if ($this->logger !== null) {
$this->logger->debug($message);
}
}
}