2
2
3
3
import com .github .dockerjava .api .command .InspectContainerResponse ;
4
4
import lombok .SneakyThrows ;
5
- import org .testcontainers .images .builder .Transferable ;
6
5
import org .testcontainers .utility .DockerImageName ;
7
6
8
- import java .nio .charset .StandardCharsets ;
9
- import java .util .Comparator ;
10
-
11
7
/**
12
8
* This container wraps Confluent Kafka and Zookeeper (optionally)
13
9
*
@@ -17,20 +13,14 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {
17
13
private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName .parse ("confluentinc/cp-kafka" );
18
14
private static final String DEFAULT_TAG = "5.4.3" ;
19
15
20
- private static final String STARTER_SCRIPT = "/testcontainers_start.sh" ;
21
-
22
16
public static final int KAFKA_PORT = 9093 ;
23
17
24
18
public static final int ZOOKEEPER_PORT = 2181 ;
25
19
26
20
private static final String DEFAULT_INTERNAL_TOPIC_RF = "1" ;
27
21
28
- private static final int PORT_NOT_ASSIGNED = -1 ;
29
-
30
22
protected String externalZookeeperConnect = null ;
31
23
32
- private int port = PORT_NOT_ASSIGNED ;
33
-
34
24
/**
35
25
* @deprecated use {@link KafkaContainer(DockerImageName)} instead
36
26
*/
@@ -80,83 +70,59 @@ public KafkaContainer withExternalZookeeper(String connectString) {
80
70
}
81
71
82
72
public String getBootstrapServers () {
83
- if (port == PORT_NOT_ASSIGNED ) {
84
- throw new IllegalStateException ("You should start Kafka container first" );
85
- }
86
- return String .format ("PLAINTEXT://%s:%s" , getHost (), port );
87
- }
88
-
89
- @ Override
90
- protected void doStart () {
91
- withCommand ("sh" , "-c" , "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT );
92
-
93
- if (externalZookeeperConnect == null ) {
94
- addExposedPort (ZOOKEEPER_PORT );
95
- }
96
-
97
- super .doStart ();
73
+ return String .format ("PLAINTEXT://%s:%s" , getHost (), getMappedPort (KAFKA_PORT ));
98
74
}
99
75
100
76
@ Override
101
- @ SneakyThrows
102
- protected void containerIsStarting (InspectContainerResponse containerInfo , boolean reused ) {
103
- super .containerIsStarting (containerInfo , reused );
104
-
105
- port = getMappedPort (KAFKA_PORT );
106
-
107
- if (reused ) {
108
- return ;
109
- }
77
+ protected void configure () {
78
+ withEnv (
79
+ "KAFKA_ADVERTISED_LISTENERS" ,
80
+ String .format (
81
+ "BROKER://%s:9092" ,
82
+ getNetwork () != null
83
+ ? getNetworkAliases ().get (0 )
84
+ : "localhost"
85
+ )
86
+ );
110
87
111
88
String command = "#!/bin/bash\n " ;
112
- final String zookeeperConnect ;
113
89
if (externalZookeeperConnect != null ) {
114
- zookeeperConnect = externalZookeeperConnect ;
90
+ withEnv ( "KAFKA_ZOOKEEPER_CONNECT" , externalZookeeperConnect ) ;
115
91
} else {
116
- zookeeperConnect = "localhost:" + ZOOKEEPER_PORT ;
92
+ addExposedPort (ZOOKEEPER_PORT );
93
+ withEnv ("KAFKA_ZOOKEEPER_CONNECT" , "localhost:" + ZOOKEEPER_PORT );
117
94
command += "echo 'clientPort=" + ZOOKEEPER_PORT + "' > zookeeper.properties\n " ;
118
95
command += "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n " ;
119
96
command += "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n " ;
120
97
command += "zookeeper-server-start zookeeper.properties &\n " ;
121
98
}
122
99
123
- command += "export KAFKA_ZOOKEEPER_CONNECT='" + zookeeperConnect + "'\n " ;
124
-
125
- command += "export KAFKA_ADVERTISED_LISTENERS='" + String .join ("," , getBootstrapServers (), brokerAdvertisedListener (containerInfo )) + "'\n " ;
126
-
127
- command += ". /etc/confluent/docker/bash-config \n " ;
128
- command += "/etc/confluent/docker/configure \n " ;
129
- command += "/etc/confluent/docker/launch \n " ;
100
+ // Optimization: skip the checks
101
+ command += "echo '' > /etc/confluent/docker/ensure \n " ;
102
+ // Run the original command
103
+ command += "/etc/confluent/docker/run \n " ;
104
+ withCommand ("sh" , "-c" , command );
105
+ }
130
106
131
- copyFileToContainer (
132
- Transferable .of (command .getBytes (StandardCharsets .UTF_8 ), 0777 ),
133
- STARTER_SCRIPT
107
+ @ Override
108
+ @ SneakyThrows
109
+ protected void containerIsStarted (InspectContainerResponse containerInfo ) {
110
+ String brokerAdvertisedListener = brokerAdvertisedListener (containerInfo );
111
+ ExecResult result = execInContainer (
112
+ "kafka-configs" ,
113
+ "--alter" ,
114
+ "--bootstrap-server" , brokerAdvertisedListener ,
115
+ "--entity-type" , "brokers" ,
116
+ "--entity-name" , getEnvMap ().get ("KAFKA_BROKER_ID" ),
117
+ "--add-config" ,
118
+ "advertised.listeners=[" + String .join ("," , getBootstrapServers (), brokerAdvertisedListener ) + "]"
134
119
);
120
+ if (result .getExitCode () != 0 ) {
121
+ throw new IllegalStateException (result .getStderr ());
122
+ }
135
123
}
136
124
137
125
protected String brokerAdvertisedListener (InspectContainerResponse containerInfo ) {
138
- // Kafka supports only one INTER_BROKER listener, so we have to pick one.
139
- // The current algorithm uses the following order of resolving the IP:
140
- // 1. Custom network's IP set via `withNetwork`
141
- // 2. Bridge network's IP
142
- // 3. Best effort fallback to getNetworkSettings#ipAddress
143
- String ipAddress = containerInfo .getNetworkSettings ().getNetworks ().entrySet ()
144
- .stream ()
145
- .filter (it -> it .getValue ().getIpAddress () != null )
146
- .max (Comparator .comparingInt (entry -> {
147
- if (getNetwork () != null && getNetwork ().getId ().equals (entry .getValue ().getNetworkID ())) {
148
- return 2 ;
149
- }
150
-
151
- if ("bridge" .equals (entry .getKey ())) {
152
- return 1 ;
153
- }
154
-
155
- return 0 ;
156
- }))
157
- .map (it -> it .getValue ().getIpAddress ())
158
- .orElseGet (() -> containerInfo .getNetworkSettings ().getIpAddress ());
159
-
160
- return String .format ("BROKER://%s:%s" , ipAddress , "9092" );
126
+ return String .format ("BROKER://%s:%s" , containerInfo .getConfig ().getHostName (), "9092" );
161
127
}
162
128
}
0 commit comments