Skip to content

Commit e15ecaa

Browse files
committed
Add basic configuration for Behat
Allowing us to more easily describe the behaviour of the features offered by this library.
1 parent 602418b commit e15ecaa

22 files changed

+961
-11
lines changed

.github/workflows/behat.yml

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
name: "Behat tests"
2+
3+
on:
4+
pull_request:
5+
push:
6+
7+
jobs:
8+
phpunit:
9+
name: "Behat tests"
10+
11+
runs-on: ${{ matrix.operating-system }}
12+
13+
strategy:
14+
matrix:
15+
dependencies:
16+
- "locked"
17+
php-version:
18+
- "8.1"
19+
operating-system:
20+
- "ubuntu-latest"
21+
22+
steps:
23+
- name: "Checkout"
24+
uses: "actions/checkout@v3"
25+
26+
- name: "Install PHP"
27+
uses: "shivammathur/[email protected]"
28+
with:
29+
php-version: "${{ matrix.php-version }}"
30+
ini-values: memory_limit=-1
31+
tools: composer:v2, cs2pr
32+
33+
- name: Get composer cache directory
34+
id: composer-cache
35+
run: echo "::set-output name=dir::$(composer config cache-files-dir)"
36+
37+
- name: "Cache dependencies"
38+
uses: "actions/[email protected]"
39+
with:
40+
path: ${{ steps.composer-cache.outputs.dir }}
41+
key: "php-${{ matrix.php-version }}-composer-${{ matrix.dependencies }}-${{ hashFiles('**/composer.lock') }}"
42+
restore-keys: "php-${{ matrix.php-version }}-composer-${{ matrix.dependencies }}-"
43+
44+
- name: "Install lowest dependencies"
45+
if: ${{ matrix.dependencies == 'lowest' }}
46+
run: "composer update --prefer-lowest --no-interaction --no-progress"
47+
48+
- name: "Install highest dependencies"
49+
if: ${{ matrix.dependencies == 'highest' }}
50+
run: "composer update --no-interaction --no-progress"
51+
52+
- name: "Install locked dependencies"
53+
if: ${{ matrix.dependencies == 'locked' }}
54+
run: "composer install --no-interaction --no-progress"
55+
56+
- name: "Install development dependencies"
57+
if: ${{ matrix.dependencies == 'development' }}
58+
run: "composer config minimum-stability dev && composer update --no-interaction --no-progress"
59+
60+
- name: "Tests"
61+
run: "make behat"

Makefile

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
PARALLELISM := $(shell nproc)
22

33
.PHONY: all
4-
all: install phpcbf phpcs phpstan phpunit infection
4+
all: install phpcbf phpcs phpstan phpunit behat infection
55

66
.PHONY: install
77
install: vendor/composer/installed.json
@@ -14,14 +14,18 @@ vendor/composer/installed.json: composer.json composer.lock
1414
phpunit:
1515
@php -d zend.assertions=1 vendor/bin/phpunit --testsuite=unit
1616

17+
.PHONY: behat
18+
behat:
19+
@php -d zend.assertions=1 vendor/bin/behat
20+
1721
.PHONY: infection
1822
infection:
1923
@php -d zend.assertions=1 -d xdebug.mode=coverage vendor/bin/phpunit --testsuite=unit --coverage-xml=build/coverage-xml --log-junit=build/junit.xml $(PHPUNIT_FLAGS)
2024
@php -d zend.assertions=1 vendor/bin/infection -s --threads=$(PARALLELISM) --coverage=build $(INFECTION_FLAGS)
2125

2226
.PHONY: phpcbf
2327
phpcbf:
24-
@vendor/bin/phpcbf --parallel=$(PARALLELISM)
28+
@vendor/bin/phpcbf --parallel=$(PARALLELISM) || true
2529

2630
.PHONY: phpcs
2731
phpcs:

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
"php-64bit": "^8.1",
2323
"ext-pcntl": "*",
2424
"psr/log": "^3.0",
25+
"react/async": "^4.0",
2526
"react/socket": "^1.6"
2627
},
2728
"require-dev": {

composer.lock

Lines changed: 79 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docker-compose.yml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ services:
55
- ALLOW_ANONYMOUS_LOGIN=yes
66

77
kafka1:
8-
image: bitnami/kafka:2.8.0
8+
image: bitnami/kafka:3.2.0
99
ports:
1010
- "9093:9093"
1111
environment:
@@ -17,9 +17,7 @@ services:
1717
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
1818

1919
kafka2:
20-
image: bitnami/kafka:2.8.0
21-
profiles:
22-
- clustered
20+
image: bitnami/kafka:3.2.0
2321
ports:
2422
- "9094:9094"
2523
environment:

phpcs.xml.dist

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
<?xml version="1.0"?>
2-
<ruleset>
2+
<ruleset
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:noNamespaceSchemaLocation="vendor/squizlabs/php_codesniffer/phpcs.xsd"
5+
>
36
<arg name="basepath" value="." />
47
<arg name="extensions" value="php" />
5-
<arg name="parallel" value="80" />
68
<arg name="colors" />
79
<arg name="cache" value=".phpcs.cache" />
810
<arg value="np" />

src/Client/Channel.php

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace Lcobucci\Kafka\Client;
5+
6+
use Closure;
7+
use Lcobucci\Kafka\Node;
8+
use Lcobucci\Kafka\Protocol\API\Request;
9+
use Lcobucci\Kafka\Protocol\API\RequestHeaders;
10+
use Lcobucci\Kafka\Protocol\Buffer;
11+
use Lcobucci\Kafka\Protocol\Schema\Parser;
12+
use Psr\Log\LoggerInterface;
13+
use React\EventLoop\Loop;
14+
use React\EventLoop\LoopInterface;
15+
use React\Promise\Deferred;
16+
use React\Promise\PromiseInterface;
17+
use React\Socket\ConnectionInterface;
18+
use React\Socket\ConnectorInterface;
19+
use SplPriorityQueue;
20+
use SplQueue;
21+
use Throwable;
22+
23+
use function assert;
24+
use function min;
25+
26+
final class Channel
27+
{
28+
/** @var SplPriorityQueue<int, array{0: array{0: Request, 1: int, 2: string}, 1: Deferred}> */
29+
private SplPriorityQueue $processingQueue;
30+
/** @var SplQueue<array{0: RequestHeaders, 1: Deferred}> */
31+
private SplQueue $inFlightQueue;
32+
private ?ConnectionInterface $connection = null;
33+
private readonly LoopInterface $loop;
34+
35+
public function __construct(
36+
private ConnectorInterface $connector,
37+
private LoggerInterface $logger,
38+
private Parser $schemaParser,
39+
private Node $node
40+
) {
41+
$this->loop = Loop::get();
42+
$this->processingQueue = new SplPriorityQueue();
43+
$this->inFlightQueue = new SplQueue();
44+
}
45+
46+
public function send(Request $request, int $correlation, string $client): PromiseInterface
47+
{
48+
$this->ensureConnected();
49+
50+
$deferred = new Deferred();
51+
$this->processingQueue->insert([[$request, $correlation, $client], $deferred], 0);
52+
53+
return $deferred->promise();
54+
}
55+
56+
private function ensureConnected(): void
57+
{
58+
if ($this->connection !== null) {
59+
return;
60+
}
61+
62+
$this->connect();
63+
}
64+
65+
private function connect(): void
66+
{
67+
$this->logger->info('Opening connection to node', ['node' => $this->node]);
68+
69+
$this->connector->connect($this->node->host . ':' . $this->node->port)->then(
70+
$this->initializeConnection(...),
71+
function (Throwable $throwable): void {
72+
$this->logger->error(
73+
'Error while connecting to node #' . $this->node->id,
74+
['node' => $this->node, 'exception' => $throwable]
75+
);
76+
77+
$this->loop->addTimer(1, $this->connect(...));
78+
}
79+
);
80+
}
81+
82+
public function initializeConnection(ConnectionInterface $connection): void
83+
{
84+
$this->logger->info('Connection to node established', ['node' => $this->node]);
85+
86+
$this->connection = $connection;
87+
$this->connection->on('data', $this->onData(...));
88+
$this->connection->on('error', $this->cleanUpConnection(...));
89+
$this->connection->on('close', $this->cleanUpConnection(...));
90+
91+
$this->loop->futureTick($this->processQueue(...));
92+
}
93+
94+
public function processQueue(): void
95+
{
96+
if (! $this->processingQueue->valid()) {
97+
return;
98+
}
99+
100+
$this->logger->debug('Processing message queue of node', ['node' => $this->node]);
101+
102+
for ($i = 0, $max = min(15, $this->processingQueue->count()); $i < $max; ++$i) {
103+
[[$request, $correlation, $client], $deferred] = $this->processingQueue->current();
104+
105+
$this->processingQueue->next();
106+
$headers = $this->sendMessage($request, $correlation, $client);
107+
$this->inFlightQueue->enqueue([$headers, $deferred]);
108+
}
109+
110+
$this->loop->futureTick([$this, 'processQueue']);
111+
}
112+
113+
private function sendMessage(Request $request, int $correlation, string $client): RequestHeaders
114+
{
115+
$headers = new RequestHeaders(
116+
$request->apiKey(),
117+
$request->highestSupportedVersion(),
118+
$correlation,
119+
$client,
120+
$request->responseClass()::parse(...)
121+
);
122+
123+
$header = $headers->toBuffer($this->schemaParser);
124+
$body = $request->toBuffer($this->schemaParser, $headers->apiVersion);
125+
126+
$length = Buffer::allocate(4);
127+
$length->writeInt($header->length() + $body->length());
128+
129+
assert($this->connection instanceof ConnectionInterface);
130+
$this->connection->write($length->bytes() . $header->bytes() . $body->bytes());
131+
132+
return $headers;
133+
}
134+
135+
public function onData(string $data): void
136+
{
137+
$this->logger->debug('Message received', ['node' => $this->node]);
138+
139+
[$headers, $deferred] = $this->inFlightQueue->dequeue();
140+
141+
$buffer = Buffer::fromContent($data);
142+
$length = $buffer->readInt();
143+
144+
$deferred->resolve($headers->parseResponse(Buffer::fromContent($buffer->read($length)), $this->schemaParser));
145+
}
146+
147+
public function disconnect(): void
148+
{
149+
if ($this->connection === null) {
150+
return;
151+
}
152+
153+
$this->connection->end();
154+
}
155+
156+
public function cleanUpConnection(): void
157+
{
158+
$this->logger->info('Closing connection to node', ['node' => $this->node]);
159+
160+
$this->connection = null;
161+
}
162+
}

0 commit comments

Comments
 (0)