Skip to content

Commit bb67aff

Browse files
committed
feat: additional advertised listners/broker addresses for kafka
1 parent 35bf0cd commit bb67aff

File tree

4 files changed

+573
-23
lines changed

4 files changed

+573
-23
lines changed

docs/modules/kafka.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,15 @@ The Kafka container will be started using a custom shell script:
6161
[Init script](../../modules/kafka/kafka.go) inside_block:starterScript
6262
<!--/codeinclude-->
6363

64+
That will set the advertised listeners with these values:
65+
66+
<!--codeinclude-->
67+
[Advertised Listeners](../../modules/kafka/kafka.go) inside_block:advertisedListeners
68+
<!--/codeinclude-->
69+
70+
KafkaContainer provides methods to read the broker addresses for different
71+
connectivity environments.
72+
6473
#### Environment variables
6574

6675
The environment variables that are already set by default are:
@@ -82,3 +91,34 @@ The `Brokers(ctx)` method returns the Kafka brokers as a string slice, containin
8291
<!--codeinclude-->
8392
[Get Kafka brokers](../../modules/kafka/kafka_test.go) inside_block:getBrokers
8493
<!--/codeinclude-->
94+
95+
#### BrokersByHostDockerInternal
96+
97+
The `BrokersByHostDockerInternal(ctx)` method returns the Kafka brokers as a
98+
string slice, containing the hostname `host.docker.internal` and a random port
99+
defined by Kafka's public port (`19092/tcp`).
100+
101+
This method is useful when you need to run additional containers that need to
102+
connect to Kafka.
103+
104+
<!--codeinclude-->
105+
[Get Kafka brokers by host.docker.internal](../../modules/kafka/examples_test.go) inside_block:getBrokersByHostDockerInternal
106+
<!--/codeinclude-->
107+
108+
#### BrokersByContainerName
109+
110+
The `BrokersByContainerName(ctx)` method returns the Kafka brokers as a string
111+
slice, addressed by the container's name(`Ex: charming_dijkstra:19093`). This
112+
method is useful when you need to run additional containers that need to connect
113+
to Kafka.
114+
115+
To use this broker address you should run all the containers inside a docker
116+
network.
117+
118+
<!--codeinclude-->
119+
[Start Kafka inside a docker network](../../modules/kafka/examples_test.go) inside_block:getBrokersByContainerName_Kafka
120+
<!--/codeinclude-->
121+
122+
<!--codeinclude-->
123+
[Get Kafka brokers by container name](../../modules/kafka/examples_test.go) inside_block:getBrokersByContainerName_Kcat
124+
<!--/codeinclude-->

modules/kafka/examples_test.go

Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
11
package kafka_test
22

33
import (
4+
"bytes"
45
"context"
56
"fmt"
7+
"io"
68
"log"
9+
"strings"
710

11+
"github.com/IBM/sarama"
12+
"github.com/docker/docker/api/types/container"
813
"github.com/testcontainers/testcontainers-go"
914
"github.com/testcontainers/testcontainers-go/modules/kafka"
15+
"github.com/testcontainers/testcontainers-go/network"
16+
"github.com/testcontainers/testcontainers-go/wait"
1017
)
1118

1219
func ExampleRun() {
@@ -41,3 +48,308 @@ func ExampleRun() {
4148
// test-cluster
4249
// true
4350
}
51+
52+
func ExampleKafkaContainer_BrokersByHostDockerInternal() {
53+
ctx := context.Background()
54+
55+
kafkaContainer, err := kafka.Run(ctx,
56+
"confluentinc/confluent-local:7.5.0",
57+
kafka.WithClusterID("test-cluster"),
58+
)
59+
if err != nil {
60+
log.Fatalf("failed to start container: %s", err)
61+
}
62+
63+
// Clean up the container after
64+
defer func() {
65+
if err := kafkaContainer.Terminate(ctx); err != nil {
66+
log.Fatalf("failed to terminate container: %s", err)
67+
}
68+
}()
69+
70+
{
71+
state, err := kafkaContainer.State(ctx)
72+
if err != nil {
73+
log.Fatalf("failed to get container state: %s", err)
74+
}
75+
76+
fmt.Println(kafkaContainer.ClusterID)
77+
fmt.Println(state.Running)
78+
}
79+
80+
const topic = "example-topic"
81+
82+
// Produce a message from the host that will be read by a consumer in another docker container
83+
{
84+
brokers, err := kafkaContainer.Brokers(ctx)
85+
if err != nil {
86+
log.Fatal(err)
87+
}
88+
89+
config := sarama.NewConfig()
90+
config.Producer.Return.Successes = true
91+
producer, err := sarama.NewSyncProducer(brokers, config)
92+
if err != nil {
93+
log.Fatal(err)
94+
}
95+
96+
if _, _, err := producer.SendMessage(&sarama.ProducerMessage{
97+
Topic: topic,
98+
Key: sarama.StringEncoder("key"),
99+
Value: sarama.StringEncoder("example_message_value"),
100+
}); err != nil {
101+
log.Fatal(err)
102+
}
103+
104+
}
105+
106+
// getBrokersByHostDockerInternal {
107+
brokers, err := kafkaContainer.BrokersByHostDockerInternal(ctx)
108+
if err != nil {
109+
log.Fatal(err)
110+
}
111+
112+
// Run another container that can connect to the kafka container via hostname "host.docker.internal"
113+
kcat, err := testcontainers.GenericContainer(
114+
ctx,
115+
testcontainers.GenericContainerRequest{
116+
ContainerRequest: testcontainers.ContainerRequest{
117+
Image: "confluentinc/cp-kafkacat",
118+
Entrypoint: []string{"kafkacat"},
119+
Cmd: []string{"-b", strings.Join(brokers, ","), "-C", "-t", topic, "-c", "1"},
120+
WaitingFor: wait.ForExit(),
121+
122+
// Add host.docker.internal to the consumer container so it can contact the kafka borkers
123+
HostConfigModifier: func(hc *container.HostConfig) {
124+
hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway")
125+
},
126+
},
127+
Started: true,
128+
},
129+
)
130+
if err != nil {
131+
log.Fatalf("kafkacat error: %v", err)
132+
}
133+
134+
lr, err := kcat.Logs(ctx)
135+
if err != nil {
136+
log.Fatalf("kafkacat logs error: %v", err)
137+
}
138+
139+
logs, err := io.ReadAll(lr)
140+
if err != nil {
141+
log.Fatalf("kafkacat logs read error: %v", err)
142+
}
143+
144+
fmt.Println("read message:", string(bytes.TrimSpace(logs)))
145+
// }
146+
147+
// Output:
148+
// test-cluster
149+
// true
150+
// read message: example_message_value
151+
}
152+
153+
func ExampleKafkaContainer_BrokersByContainerName() {
154+
ctx := context.Background()
155+
156+
// getBrokersByContainerName_Kafka {
157+
net, err := network.New(ctx)
158+
if err != nil {
159+
log.Fatalf("failed to create network: %s", err)
160+
}
161+
162+
kafkaContainer, err := kafka.Run(ctx,
163+
"confluentinc/confluent-local:7.5.0",
164+
kafka.WithClusterID("test-cluster"),
165+
network.WithNetwork(nil, net), // Run kafka test container in a new docker network
166+
)
167+
if err != nil {
168+
log.Fatalf("failed to start container: %s", err)
169+
}
170+
// }
171+
172+
// Clean up the container after
173+
defer func() {
174+
if err := kafkaContainer.Terminate(ctx); err != nil {
175+
log.Fatalf("failed to terminate container: %s", err)
176+
}
177+
}()
178+
179+
{
180+
state, err := kafkaContainer.State(ctx)
181+
if err != nil {
182+
log.Fatalf("failed to get container state: %s", err)
183+
}
184+
185+
fmt.Println(kafkaContainer.ClusterID)
186+
fmt.Println(state.Running)
187+
}
188+
189+
const topic = "example-topic"
190+
191+
// Produce a message from the host that will be read by a consumer in another docker container
192+
{
193+
brokers, err := kafkaContainer.Brokers(ctx)
194+
if err != nil {
195+
log.Fatal(err)
196+
}
197+
198+
config := sarama.NewConfig()
199+
config.Producer.Return.Successes = true
200+
producer, err := sarama.NewSyncProducer(brokers, config)
201+
if err != nil {
202+
log.Fatal(err)
203+
}
204+
205+
if _, _, err := producer.SendMessage(&sarama.ProducerMessage{
206+
Topic: topic,
207+
Key: sarama.StringEncoder("key"),
208+
Value: sarama.StringEncoder("example_message_value"),
209+
}); err != nil {
210+
log.Fatal(err)
211+
}
212+
}
213+
214+
// getBrokersByContainerName_Kcat {
215+
brokers, err := kafkaContainer.BrokersByContainerName(ctx)
216+
if err != nil {
217+
log.Fatal(err)
218+
}
219+
220+
// Run another container that can connect to the kafka container via the kafka containers name
221+
kcat, err := testcontainers.GenericContainer(
222+
ctx,
223+
testcontainers.GenericContainerRequest{
224+
ContainerRequest: testcontainers.ContainerRequest{
225+
Image: "confluentinc/cp-kafkacat",
226+
Entrypoint: []string{"kafkacat"},
227+
Cmd: []string{"-b", strings.Join(brokers, ","), "-C", "-t", topic, "-c", "1"},
228+
WaitingFor: wait.ForExit(),
229+
Networks: []string{net.Name}, // Run kafkacat in the same docker network as the testcontainer
230+
},
231+
Started: true,
232+
},
233+
)
234+
if err != nil {
235+
log.Fatalf("kafkacat error: %v", err)
236+
}
237+
238+
lr, err := kcat.Logs(ctx)
239+
if err != nil {
240+
log.Fatalf("kafkacat logs error: %v", err)
241+
}
242+
243+
logs, err := io.ReadAll(lr)
244+
if err != nil {
245+
log.Fatalf("kafkacat logs read error: %v", err)
246+
}
247+
248+
fmt.Println("read message:", string(bytes.TrimSpace(logs)))
249+
// }
250+
251+
// Output:
252+
// test-cluster
253+
// true
254+
// read message: example_message_value
255+
}
256+
257+
func ExampleKafkaContainer_BrokersByContainerId() {
258+
ctx := context.Background()
259+
260+
net, err := network.New(ctx)
261+
if err != nil {
262+
log.Fatalf("failed to create network: %s", err)
263+
}
264+
265+
kafkaContainer, err := kafka.Run(ctx,
266+
"confluentinc/confluent-local:7.5.0",
267+
kafka.WithClusterID("test-cluster"),
268+
network.WithNetwork(nil, net), // Run kafka test container in a new docker network
269+
)
270+
if err != nil {
271+
log.Fatalf("failed to start container: %s", err)
272+
}
273+
274+
// Clean up the container after
275+
defer func() {
276+
if err := kafkaContainer.Terminate(ctx); err != nil {
277+
log.Fatalf("failed to terminate container: %s", err)
278+
}
279+
}()
280+
281+
{
282+
state, err := kafkaContainer.State(ctx)
283+
if err != nil {
284+
log.Fatalf("failed to get container state: %s", err)
285+
}
286+
287+
fmt.Println(kafkaContainer.ClusterID)
288+
fmt.Println(state.Running)
289+
}
290+
291+
const topic = "example-topic"
292+
293+
// Produce a message from the host that will be read by a consumer in another docker container
294+
{
295+
brokers, err := kafkaContainer.Brokers(ctx)
296+
if err != nil {
297+
log.Fatal(err)
298+
}
299+
300+
config := sarama.NewConfig()
301+
config.Producer.Return.Successes = true
302+
producer, err := sarama.NewSyncProducer(brokers, config)
303+
if err != nil {
304+
log.Fatal(err)
305+
}
306+
307+
if _, _, err := producer.SendMessage(&sarama.ProducerMessage{
308+
Topic: topic,
309+
Key: sarama.StringEncoder("key"),
310+
Value: sarama.StringEncoder("example_message_value"),
311+
}); err != nil {
312+
log.Fatal(err)
313+
}
314+
}
315+
316+
brokers, err := kafkaContainer.BrokersByContainerId(ctx)
317+
if err != nil {
318+
log.Fatal(err)
319+
}
320+
321+
// Run another container that can connect to the kafka container via the kafka containers ContainerID
322+
kcat, err := testcontainers.GenericContainer(
323+
ctx,
324+
testcontainers.GenericContainerRequest{
325+
ContainerRequest: testcontainers.ContainerRequest{
326+
Image: "confluentinc/cp-kafkacat",
327+
Entrypoint: []string{"kafkacat"},
328+
Cmd: []string{"-b", strings.Join(brokers, ","), "-C", "-t", topic, "-c", "1"},
329+
WaitingFor: wait.ForExit(),
330+
Networks: []string{net.Name}, // Run kafkacat in the same docker network as the testcontainer
331+
},
332+
Started: true,
333+
},
334+
)
335+
if err != nil {
336+
log.Fatalf("kafkacat error: %v", err)
337+
}
338+
339+
lr, err := kcat.Logs(ctx)
340+
if err != nil {
341+
log.Fatalf("kafkacat logs error: %v", err)
342+
}
343+
344+
logs, err := io.ReadAll(lr)
345+
if err != nil {
346+
log.Fatalf("kafkacat logs read error: %v", err)
347+
}
348+
349+
fmt.Println("read message:", string(bytes.TrimSpace(logs)))
350+
351+
// Output:
352+
// test-cluster
353+
// true
354+
// read message: example_message_value
355+
}

0 commit comments

Comments
 (0)