Skip to content

Commit aaa0585

Browse files
committed
[ci][fluss] Add MySQL to Fluss E2e IT case
Signed-off-by: yuxiqian <[email protected]>
1 parent 2139824 commit aaa0585

File tree

5 files changed

+447
-1
lines changed

5 files changed

+447
-1
lines changed

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ limitations under the License.
4242
<maven.plugin.download.version>1.6.8</maven.plugin.download.version>
4343
<iceberg.version>1.6.1</iceberg.version>
4444
<hive.version>2.3.9</hive.version>
45+
<fluss.version>0.7.0</fluss.version>
4546
</properties>
4647

4748
<dependencies>
@@ -547,6 +548,24 @@ limitations under the License.
547548
<outputDirectory>${project.build.directory}/dependencies
548549
</outputDirectory>
549550
</artifactItem>
551+
<artifactItem>
552+
<groupId>org.apache.flink</groupId>
553+
<artifactId>flink-cdc-pipeline-connector-fluss</artifactId>
554+
<version>${project.version}</version>
555+
<destFileName>fluss-cdc-pipeline-connector.jar</destFileName>
556+
<type>jar</type>
557+
<outputDirectory>${project.build.directory}/dependencies
558+
</outputDirectory>
559+
</artifactItem>
560+
<artifactItem>
561+
<groupId>com.alibaba.fluss</groupId>
562+
<artifactId>fluss-flink-${flink.major.version}</artifactId>
563+
<version>${fluss.version}</version>
564+
<destFileName>fluss-sql-connector.jar</destFileName>
565+
<type>jar</type>
566+
<outputDirectory>${project.build.directory}/dependencies
567+
</outputDirectory>
568+
</artifactItem>
550569
<artifactItem>
551570
<groupId>org.apache.flink</groupId>
552571
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
Lines changed: 326 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,326 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.pipeline.tests;
19+
20+
import org.apache.flink.cdc.common.test.utils.TestUtils;
21+
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
22+
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
23+
24+
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
25+
26+
import org.assertj.core.api.Assertions;
27+
import org.junit.jupiter.api.AfterEach;
28+
import org.junit.jupiter.api.BeforeEach;
29+
import org.junit.jupiter.params.ParameterizedTest;
30+
import org.junit.jupiter.params.provider.ValueSource;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
import org.testcontainers.containers.GenericContainer;
34+
import org.testcontainers.containers.output.Slf4jLogConsumer;
35+
import org.testcontainers.images.builder.Transferable;
36+
import org.testcontainers.junit.jupiter.Container;
37+
import org.testcontainers.junit.jupiter.Testcontainers;
38+
39+
import java.nio.file.Path;
40+
import java.sql.Connection;
41+
import java.sql.DriverManager;
42+
import java.sql.SQLException;
43+
import java.sql.Statement;
44+
import java.time.Duration;
45+
import java.util.Arrays;
46+
import java.util.Collections;
47+
import java.util.List;
48+
import java.util.stream.Collectors;
49+
50+
/** An End-to-end test case for Fluss pipeline connector. */
51+
@Testcontainers
52+
public class FlussE2eITCase extends PipelineTestEnvironment {
53+
54+
private static final Logger LOG = LoggerFactory.getLogger(FlussE2eITCase.class);
55+
private static final Duration FLUSS_TESTCASE_TIMEOUT = Duration.ofMinutes(3);
56+
private static final String flussImageTag = "fluss/fluss:0.7.0";
57+
private static final String zooKeeperImageTag = "zookeeper:3.9.2";
58+
59+
private static final List<String> flussCoordinatorProperties =
60+
Arrays.asList(
61+
"zookeeper.address: zookeeper:2181",
62+
"bind.listeners: INTERNAL://coordinator-server:0, CLIENT://coordinator-server:9123",
63+
"internal.listener.name: INTERNAL",
64+
"remote.data.dir: /tmp/fluss/remote-data",
65+
"# security properties",
66+
"security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT",
67+
"security.sasl.enabled.mechanisms: PLAIN",
68+
"security.sasl.plain.jaas.config: com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule required user_admin=\"admin-pass\" user_developer=\"developer-pass\";",
69+
"#authorizer.enabled: true",
70+
"super.users: User:admin");
71+
72+
private static final List<String> flussTabletServerProperties =
73+
Arrays.asList(
74+
"zookeeper.address: zookeeper:2181",
75+
"bind.listeners: INTERNAL://tablet-server:0, CLIENT://tablet-server:9123",
76+
"internal.listener.name: INTERNAL",
77+
"tablet-server.id: 0",
78+
"kv.snapshot.interval: 0s",
79+
"data.dir: /tmp/fluss/data",
80+
"remote.data.dir: /tmp/fluss/remote-data",
81+
"# security properties",
82+
"security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT",
83+
"security.sasl.enabled.mechanisms: PLAIN",
84+
"security.sasl.plain.jaas.config: com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule required user_admin=\"admin-pass\" user_developer=\"developer-pass\";",
85+
"#authorizer.enabled: true",
86+
"super.users: User:admin");
87+
88+
@Container
89+
private static final GenericContainer<?> ZOOKEEPER =
90+
new GenericContainer<>(zooKeeperImageTag)
91+
.withNetworkAliases("zookeeper")
92+
.withExposedPorts(2181)
93+
.withNetwork(NETWORK)
94+
.withLogConsumer(new Slf4jLogConsumer(LOG));
95+
96+
@Container
97+
private static final GenericContainer<?> FLUSS_COORDINATOR =
98+
new GenericContainer<>(flussImageTag)
99+
.withEnv(
100+
ImmutableMap.of(
101+
"FLUSS_PROPERTIES",
102+
String.join("\n", flussCoordinatorProperties)))
103+
.withCommand("coordinatorServer")
104+
.withNetworkAliases("coordinator-server")
105+
.withExposedPorts(9123)
106+
.withNetwork(NETWORK)
107+
.dependsOn(ZOOKEEPER)
108+
.withLogConsumer(new Slf4jLogConsumer(LOG));
109+
110+
@Container
111+
private static final GenericContainer<?> FLUSS_TABLET_SERVER =
112+
new GenericContainer<>(flussImageTag)
113+
.withEnv(
114+
ImmutableMap.of(
115+
"FLUSS_PROPERTIES",
116+
String.join("\n", flussTabletServerProperties)))
117+
.withCommand("tabletServer")
118+
.withNetworkAliases("tablet-server")
119+
.withExposedPorts(9123)
120+
.withNetwork(NETWORK)
121+
.dependsOn(ZOOKEEPER, FLUSS_COORDINATOR)
122+
.withLogConsumer(new Slf4jLogConsumer(LOG));
123+
124+
protected final UniqueDatabase inventoryDatabaseWithPrimaryKey =
125+
new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
126+
127+
protected final UniqueDatabase inventoryDatabaseWithoutPrimaryKey =
128+
new UniqueDatabase(
129+
MYSQL, "mysql_inventory_wo_pk", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
130+
131+
@Override
132+
protected List<String> copyJarToFlinkLib() {
133+
return Collections.singletonList("fluss-sql-connector.jar");
134+
}
135+
136+
@BeforeEach
137+
public void before() throws Exception {
138+
super.before();
139+
inventoryDatabaseWithPrimaryKey.createAndInitialize();
140+
inventoryDatabaseWithoutPrimaryKey.createAndInitialize();
141+
}
142+
143+
@AfterEach
144+
public void after() {
145+
super.after();
146+
inventoryDatabaseWithPrimaryKey.dropDatabase();
147+
inventoryDatabaseWithoutPrimaryKey.dropDatabase();
148+
}
149+
150+
@ParameterizedTest(name = "PkTable: {0}")
151+
@ValueSource(booleans = {true, false})
152+
void testMySqlToFluss(boolean hasPrimaryKey) throws Exception {
153+
UniqueDatabase inventoryDatabase =
154+
hasPrimaryKey
155+
? inventoryDatabaseWithPrimaryKey
156+
: inventoryDatabaseWithoutPrimaryKey;
157+
String database = inventoryDatabase.getDatabaseName();
158+
String pipelineJob =
159+
String.format(
160+
"source:\n"
161+
+ " type: mysql\n"
162+
+ " hostname: %s\n"
163+
+ " port: 3306\n"
164+
+ " username: %s\n"
165+
+ " password: %s\n"
166+
+ " tables: %s.\\.*\n"
167+
+ " server-id: 5400-5404\n"
168+
+ " server-time-zone: UTC\n"
169+
+ " scan.incremental.snapshot.chunk.key-column: %s.\\.*:id\n"
170+
+ "\n"
171+
+ "sink:\n"
172+
+ " type: fluss\n"
173+
+ " bootstrap.servers: coordinator-server:9123\n"
174+
+ " properties.client.security.protocol: sasl\n"
175+
+ " properties.client.security.sasl.mechanism: PLAIN\n"
176+
+ " properties.client.security.sasl.username: developer\n"
177+
+ " properties.client.security.sasl.password: developer-pass\n"
178+
+ "\n"
179+
+ "pipeline:\n"
180+
+ " parallelism: %d",
181+
INTER_CONTAINER_MYSQL_ALIAS,
182+
MYSQL_TEST_USER,
183+
MYSQL_TEST_PASSWORD,
184+
database,
185+
database,
186+
parallelism);
187+
Path flussConnector = TestUtils.getResource("fluss-cdc-pipeline-connector.jar");
188+
submitPipelineJob(pipelineJob, flussConnector);
189+
waitUntilJobRunning(Duration.ofSeconds(30));
190+
LOG.info("Pipeline job is running");
191+
192+
validateSinkResult(
193+
database,
194+
"products",
195+
Arrays.asList(
196+
"101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
197+
"102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
198+
"103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
199+
"104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
200+
"105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
201+
"106, hammer, 16oz carpenter's hammer, 1.0, null, null, null",
202+
"107, rocks, box of assorted rocks, 5.3, null, null, null",
203+
"108, jacket, water resistent black wind breaker, 0.1, null, null, null",
204+
"109, spare tire, 24 inch spare tire, 22.2, null, null, null"));
205+
206+
validateSinkResult(
207+
database,
208+
"customers",
209+
Arrays.asList(
210+
"101, user_1, Shanghai, 123567891234",
211+
"102, user_2, Shanghai, 123567891234",
212+
"103, user_3, Shanghai, 123567891234",
213+
"104, user_4, Shanghai, 123567891234"));
214+
215+
if (!hasPrimaryKey) {
216+
// Non-primary key does not support deleting rows for now.
217+
return;
218+
}
219+
220+
String mysqlJdbcUrl =
221+
String.format(
222+
"jdbc:mysql://%s:%s/%s",
223+
MYSQL.getHost(),
224+
MYSQL.getDatabasePort(),
225+
inventoryDatabase.getDatabaseName());
226+
227+
// Fluss does not support applying DDL events for now.
228+
try (Connection conn =
229+
DriverManager.getConnection(
230+
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
231+
Statement stat = conn.createStatement()) {
232+
stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
233+
stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
234+
stat.execute("DELETE FROM products WHERE id=111;");
235+
stat.execute(
236+
"INSERT INTO products VALUES (default,'jacket','water resistant white wind breaker', 0.2, null, null, null);");
237+
stat.execute(
238+
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter', 5.18, null, null, null);");
239+
} catch (SQLException e) {
240+
LOG.error("Update table for CDC failed.", e);
241+
throw e;
242+
}
243+
244+
validateSinkResult(
245+
database,
246+
"products",
247+
Arrays.asList(
248+
"101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
249+
"102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
250+
"103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
251+
"104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
252+
"105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
253+
"106, hammer, 18oz carpenter hammer, 1.0, null, null, null",
254+
"107, rocks, box of assorted rocks, 5.1, null, null, null",
255+
"108, jacket, water resistent black wind breaker, 0.1, null, null, null",
256+
"109, spare tire, 24 inch spare tire, 22.2, null, null, null",
257+
"110, jacket, water resistant white wind breaker, 0.2, null, null, null",
258+
"111, scooter, Big 2-wheel scooter, 5.18, null, null, null"));
259+
}
260+
261+
private List<String> fetchFlussTableRows(String database, String table, int rowCount)
262+
throws Exception {
263+
String template =
264+
readLines("docker/peek-fluss.sql").stream()
265+
.filter(line -> !line.startsWith("--"))
266+
.collect(Collectors.joining("\n"));
267+
String sql = String.format(template, database, table, rowCount);
268+
String containerSqlPath = sharedVolume.toString() + "/peek.sql";
269+
jobManager.copyFileToContainer(Transferable.of(sql), containerSqlPath);
270+
271+
org.testcontainers.containers.Container.ExecResult result =
272+
jobManager.execInContainer("/opt/flink/bin/sql-client.sh", "-f", containerSqlPath);
273+
if (result.getExitCode() != 0) {
274+
throw new RuntimeException(
275+
"Failed to execute peek script. Stdout: "
276+
+ result.getStdout()
277+
+ "; Stderr: "
278+
+ result.getStderr());
279+
}
280+
281+
return Arrays.stream(result.getStdout().split("\n"))
282+
.filter(line -> line.startsWith("|"))
283+
.skip(1)
284+
.map(FlussE2eITCase::extractRow)
285+
.map(row -> String.format("%s", String.join(", ", row)))
286+
.collect(Collectors.toList());
287+
}
288+
289+
private static String[] extractRow(String row) {
290+
return Arrays.stream(row.split("\\|"))
291+
.map(String::trim)
292+
.filter(col -> !col.isEmpty())
293+
.map(col -> col.equals("<NULL>") ? "null" : col)
294+
.toArray(String[]::new);
295+
}
296+
297+
private void validateSinkResult(String database, String table, List<String> expected)
298+
throws InterruptedException {
299+
LOG.info("Verifying Fluss {}::{} results...", database, table);
300+
long deadline = System.currentTimeMillis() + FLUSS_TESTCASE_TIMEOUT.toMillis();
301+
List<String> results = Collections.emptyList();
302+
int rowCount = expected.size();
303+
while (System.currentTimeMillis() < deadline) {
304+
try {
305+
results = fetchFlussTableRows(database, table, rowCount);
306+
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
307+
LOG.info(
308+
"Successfully verified {} records in {} seconds.",
309+
expected.size(),
310+
(System.currentTimeMillis() - deadline + FLUSS_TESTCASE_TIMEOUT.toMillis())
311+
/ 1000);
312+
return;
313+
} catch (Exception e) {
314+
LOG.warn("Validate failed, waiting for the next loop...", e);
315+
} catch (AssertionError ignored) {
316+
// AssertionError contains way too much records and might flood the log output.
317+
LOG.warn(
318+
"Results mismatch, expected {} records, but got {} actually. Waiting for the next loop...",
319+
expected.size(),
320+
results.size());
321+
}
322+
Thread.sleep(1000L);
323+
}
324+
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
325+
}
326+
}

0 commit comments

Comments
 (0)